Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/configuration.py: 49%

777 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import datetime 

20import functools 

21import json 

22import logging 

23import multiprocessing 

24import os 

25import pathlib 

26import re 

27import shlex 

28import stat 

29import subprocess 

30import sys 

31import warnings 

32from base64 import b64encode 

33from collections import OrderedDict 

34 

35# Ignored Mypy on configparser because it thinks the configparser module has no _UNSET attribute 

36from configparser import _UNSET, ConfigParser, NoOptionError, NoSectionError # type: ignore 

37from contextlib import contextmanager, suppress 

38from json.decoder import JSONDecodeError 

39from re import Pattern 

40from typing import IO, Any, Dict, Iterable, Tuple, Union 

41from urllib.parse import urlsplit 

42 

43from typing_extensions import overload 

44 

45from airflow.exceptions import AirflowConfigException 

46from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend 

47from airflow.utils import yaml 

48from airflow.utils.module_loading import import_string 

49from airflow.utils.weight_rule import WeightRule 

50 

51log = logging.getLogger(__name__) 

52 

53# show Airflow's deprecation warnings 

54if not sys.warnoptions: 

55 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") 

56 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") 

57 

58_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") 

59 

60ConfigType = Union[str, int, float, bool] 

61ConfigOptionsDictType = Dict[str, ConfigType] 

62ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]] 

63ConfigSourcesType = Dict[str, ConfigSectionSourcesType] 

64 

65ENV_VAR_PREFIX = "AIRFLOW__" 

66 

67 

68def _parse_sqlite_version(s: str) -> tuple[int, ...]: 

69 match = _SQLITE3_VERSION_PATTERN.match(s) 

70 if match is None: 

71 return () 

72 return tuple(int(p) for p in match.group("version").split(".")) 

73 

74 

75@overload 

76def expand_env_var(env_var: None) -> None: 

77 ... 

78 

79 

80@overload 

81def expand_env_var(env_var: str) -> str: 

82 ... 

83 

84 

85def expand_env_var(env_var: str | None) -> str | None: 

86 """ 

87 Expands (potentially nested) env vars. 

88 

89 Repeat and apply `expandvars` and `expanduser` until 

90 interpolation stops having any effect. 

91 """ 

92 if not env_var: 

93 return env_var 

94 while True: 

95 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

96 if interpolated == env_var: 

97 return interpolated 

98 else: 

99 env_var = interpolated 

100 

101 

102def run_command(command: str) -> str: 

103 """Runs command and returns stdout.""" 

104 process = subprocess.Popen( 

105 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

106 ) 

107 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

108 

109 if process.returncode != 0: 

110 raise AirflowConfigException( 

111 f"Cannot execute {command}. Error code is: {process.returncode}. " 

112 f"Output: {output}, Stderr: {stderr}" 

113 ) 

114 

115 return output 

116 

117 

118def _get_config_value_from_secret_backend(config_key: str) -> str | None: 

119 """Get Config option values from Secret Backend.""" 

120 try: 

121 secrets_client = get_custom_secret_backend() 

122 if not secrets_client: 

123 return None 

124 return secrets_client.get_config(config_key) 

125 except Exception as e: 

126 raise AirflowConfigException( 

127 "Cannot retrieve config from alternative secrets backend. " 

128 "Make sure it is configured properly and that the Backend " 

129 "is accessible.\n" 

130 f"{e}" 

131 ) 

132 

133 

134def _default_config_file_path(file_name: str) -> str: 

135 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") 

136 return os.path.join(templates_dir, file_name) 

137 

138 

139def default_config_yaml() -> dict[str, Any]: 

140 """ 

141 Read Airflow configs from YAML file. 

142 

143 :return: Python dictionary containing configs & their info 

144 """ 

145 with open(_default_config_file_path("config.yml")) as config_file: 

146 return yaml.safe_load(config_file) 

147 

148 

149SENSITIVE_CONFIG_VALUES = { 

150 ("database", "sql_alchemy_conn"), 

151 ("core", "fernet_key"), 

152 ("celery", "broker_url"), 

153 ("celery", "flower_basic_auth"), 

154 ("celery", "result_backend"), 

155 ("atlas", "password"), 

156 ("smtp", "smtp_password"), 

157 ("webserver", "secret_key"), 

158 # The following options are deprecated 

159 ("core", "sql_alchemy_conn"), 

160} 

161 

162 

163class AirflowConfigParser(ConfigParser): 

164 """Custom Airflow Configparser supporting defaults and deprecated options.""" 

165 

166 # These configuration elements can be fetched as the stdout of commands 

167 # following the "{section}__{name}_cmd" pattern, the idea behind this 

168 # is to not store password on boxes in text files. 

169 # These configs can also be fetched from Secrets backend 

170 # following the "{section}__{name}__secret" pattern 

171 

172 sensitive_config_values: set[tuple[str, str]] = SENSITIVE_CONFIG_VALUES 

173 

174 # A mapping of (new section, new option) -> (old section, old option, since_version). 

175 # When reading new option, the old option will be checked to see if it exists. If it does a 

176 # DeprecationWarning will be issued and the old option will be used instead 

177 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

178 ("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"), 

179 ("logging", "interleave_timestamp_parser"): ("core", "interleave_timestamp_parser", "2.6.1"), 

180 ("logging", "base_log_folder"): ("core", "base_log_folder", "2.0.0"), 

181 ("logging", "remote_logging"): ("core", "remote_logging", "2.0.0"), 

182 ("logging", "remote_log_conn_id"): ("core", "remote_log_conn_id", "2.0.0"), 

183 ("logging", "remote_base_log_folder"): ("core", "remote_base_log_folder", "2.0.0"), 

184 ("logging", "encrypt_s3_logs"): ("core", "encrypt_s3_logs", "2.0.0"), 

185 ("logging", "logging_level"): ("core", "logging_level", "2.0.0"), 

186 ("logging", "fab_logging_level"): ("core", "fab_logging_level", "2.0.0"), 

187 ("logging", "logging_config_class"): ("core", "logging_config_class", "2.0.0"), 

188 ("logging", "colored_console_log"): ("core", "colored_console_log", "2.0.0"), 

189 ("logging", "colored_log_format"): ("core", "colored_log_format", "2.0.0"), 

190 ("logging", "colored_formatter_class"): ("core", "colored_formatter_class", "2.0.0"), 

191 ("logging", "log_format"): ("core", "log_format", "2.0.0"), 

192 ("logging", "simple_log_format"): ("core", "simple_log_format", "2.0.0"), 

193 ("logging", "task_log_prefix_template"): ("core", "task_log_prefix_template", "2.0.0"), 

194 ("logging", "log_filename_template"): ("core", "log_filename_template", "2.0.0"), 

195 ("logging", "log_processor_filename_template"): ("core", "log_processor_filename_template", "2.0.0"), 

196 ("logging", "dag_processor_manager_log_location"): ( 

197 "core", 

198 "dag_processor_manager_log_location", 

199 "2.0.0", 

200 ), 

201 ("logging", "task_log_reader"): ("core", "task_log_reader", "2.0.0"), 

202 ("metrics", "metrics_allow_list"): ("metrics", "statsd_allow_list", "2.6.0"), 

203 ("metrics", "metrics_block_list"): ("metrics", "statsd_block_list", "2.6.0"), 

204 ("metrics", "statsd_on"): ("scheduler", "statsd_on", "2.0.0"), 

205 ("metrics", "statsd_host"): ("scheduler", "statsd_host", "2.0.0"), 

206 ("metrics", "statsd_port"): ("scheduler", "statsd_port", "2.0.0"), 

207 ("metrics", "statsd_prefix"): ("scheduler", "statsd_prefix", "2.0.0"), 

208 ("metrics", "statsd_allow_list"): ("scheduler", "statsd_allow_list", "2.0.0"), 

209 ("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"), 

210 ("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"), 

211 ("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"), 

212 ("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"), 

213 ("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"), 

214 ("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"), 

215 ("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"), 

216 ("operators", "default_queue"): ("celery", "default_queue", "2.1.0"), 

217 ("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"), 

218 ("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"), 

219 ("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"), 

220 ("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"), 

221 ("logging", "worker_log_server_port"): ("celery", "worker_log_server_port", "2.2.0"), 

222 ("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"), 

223 ("api", "auth_backends"): ("api", "auth_backend", "2.3.0"), 

224 ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"), 

225 ("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"), 

226 ("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"), 

227 ("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"), 

228 ("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"), 

229 ("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"), 

230 ("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"), 

231 ("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"), 

232 ("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"), 

233 ("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"), 

234 ("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"), 

235 ("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"), 

236 ("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"), 

237 ("scheduler", "task_queued_timeout_check_interval"): ( 

238 "kubernetes_executor", 

239 "worker_pods_pending_timeout_check_interval", 

240 "2.6.0", 

241 ), 

242 } 

243 

244 # A mapping of new configurations to a list of old configurations for when one configuration 

245 # deprecates more than one other deprecation. The deprecation logic for these configurations 

246 # is defined in SchedulerJobRunner. 

247 many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = { 

248 ("scheduler", "task_queued_timeout"): [ 

249 ("celery", "stalled_task_timeout", "2.6.0"), 

250 ("celery", "task_adoption_timeout", "2.6.0"), 

251 ("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"), 

252 ] 

253 } 

254 

255 # A mapping of new section -> (old section, since_version). 

256 deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")} 

257 

258 # Now build the inverse so we can go from old_section/old_key to new_section/new_key 

259 # if someone tries to retrieve it based on old_section/old_key 

260 @functools.cached_property 

261 def inversed_deprecated_options(self): 

262 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

263 

264 @functools.cached_property 

265 def inversed_deprecated_sections(self): 

266 return { 

267 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

268 } 

269 

270 # A mapping of old default values that we want to change and warn the user 

271 # about. Mapping of section -> setting -> { old, replace, by_version } 

272 deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = { 

273 "core": { 

274 "hostname_callable": (re.compile(r":"), r".", "2.1"), 

275 }, 

276 "webserver": { 

277 "navbar_color": (re.compile(r"\A#007A87\Z", re.IGNORECASE), "#fff", "2.1"), 

278 "dag_default_view": (re.compile(r"^tree$"), "grid", "3.0"), 

279 }, 

280 "email": { 

281 "email_backend": ( 

282 re.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"), 

283 r"airflow.providers.sendgrid.utils.emailer.send_email", 

284 "2.1", 

285 ), 

286 }, 

287 "logging": { 

288 "log_filename_template": ( 

289 re.compile(re.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")), 

290 "XX-set-after-default-config-loaded-XX", 

291 "3.0", 

292 ), 

293 }, 

294 "api": { 

295 "auth_backends": ( 

296 re.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"), 

297 "airflow.api.auth.backend.session", 

298 "3.0", 

299 ), 

300 }, 

301 "elasticsearch": { 

302 "log_id_template": ( 

303 re.compile("^" + re.escape("{dag_id}-{task_id}-{execution_date}-{try_number}") + "$"), 

304 "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", 

305 "3.0", 

306 ) 

307 }, 

308 } 

309 

310 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] 

311 enums_options = { 

312 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), 

313 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], 

314 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(), 

315 ("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"], 

316 ("logging", "logging_level"): _available_logging_levels, 

317 ("logging", "fab_logging_level"): _available_logging_levels, 

318 # celery_logging_level can be empty, which uses logging_level as fallback 

319 ("logging", "celery_logging_level"): _available_logging_levels + [""], 

320 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", ""], 

321 } 

322 

323 upgraded_values: dict[tuple[str, str], str] 

324 """Mapping of (section,option) to the old value that was upgraded""" 

325 

326 # This method transforms option names on every read, get, or set operation. 

327 # This changes from the default behaviour of ConfigParser from lower-casing 

328 # to instead be case-preserving 

329 def optionxform(self, optionstr: str) -> str: 

330 return optionstr 

331 

332 def __init__(self, default_config: str | None = None, *args, **kwargs): 

333 super().__init__(*args, **kwargs) 

334 self.upgraded_values = {} 

335 

336 self.airflow_defaults = ConfigParser(*args, **kwargs) 

337 if default_config is not None: 

338 self.airflow_defaults.read_string(default_config) 

339 # Set the upgrade value based on the current loaded default 

340 default = self.airflow_defaults.get("logging", "log_filename_template", fallback=None) 

341 if default: 

342 replacement = self.deprecated_values["logging"]["log_filename_template"] 

343 self.deprecated_values["logging"]["log_filename_template"] = ( 

344 replacement[0], 

345 default, 

346 replacement[2], 

347 ) 

348 else: 

349 # In case of tests it might not exist 

350 with suppress(KeyError): 

351 del self.deprecated_values["logging"]["log_filename_template"] 

352 else: 

353 with suppress(KeyError): 

354 del self.deprecated_values["logging"]["log_filename_template"] 

355 

356 self.is_validated = False 

357 self._suppress_future_warnings = False 

358 

359 def validate(self): 

360 self._validate_sqlite3_version() 

361 self._validate_enums() 

362 

363 for section, replacement in self.deprecated_values.items(): 

364 for name, info in replacement.items(): 

365 old, new, version = info 

366 current_value = self.get(section, name, fallback="") 

367 if self._using_old_value(old, current_value): 

368 self.upgraded_values[(section, name)] = current_value 

369 new_value = old.sub(new, current_value) 

370 self._update_env_var(section=section, name=name, new_value=new_value) 

371 self._create_future_warning( 

372 name=name, 

373 section=section, 

374 current_value=current_value, 

375 new_value=new_value, 

376 version=version, 

377 ) 

378 

379 self._upgrade_auth_backends() 

380 self._upgrade_postgres_metastore_conn() 

381 self.is_validated = True 

382 

383 def _upgrade_auth_backends(self): 

384 """ 

385 Ensure a custom auth_backends setting contains session. 

386 

387 This is required by the UI for ajax queries. 

388 """ 

389 old_value = self.get("api", "auth_backends", fallback="") 

390 if old_value in ("airflow.api.auth.backend.default", ""): 

391 # handled by deprecated_values 

392 pass 

393 elif old_value.find("airflow.api.auth.backend.session") == -1: 

394 new_value = old_value + ",airflow.api.auth.backend.session" 

395 self._update_env_var(section="api", name="auth_backends", new_value=new_value) 

396 self.upgraded_values[("api", "auth_backends")] = old_value 

397 

398 # if the old value is set via env var, we need to wipe it 

399 # otherwise, it'll "win" over our adjusted value 

400 old_env_var = self._env_var_name("api", "auth_backend") 

401 os.environ.pop(old_env_var, None) 

402 

403 warnings.warn( 

404 "The auth_backends setting in [api] has had airflow.api.auth.backend.session added " 

405 "in the running config, which is needed by the UI. Please update your config before " 

406 "Apache Airflow 3.0.", 

407 FutureWarning, 

408 ) 

409 

410 def _upgrade_postgres_metastore_conn(self): 

411 """ 

412 Upgrade SQL schemas. 

413 

414 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` 

415 must be replaced with `postgresql`. 

416 """ 

417 section, key = "database", "sql_alchemy_conn" 

418 old_value = self.get(section, key, _extra_stacklevel=1) 

419 bad_schemes = ["postgres+psycopg2", "postgres"] 

420 good_scheme = "postgresql" 

421 parsed = urlsplit(old_value) 

422 if parsed.scheme in bad_schemes: 

423 warnings.warn( 

424 f"Bad scheme in Airflow configuration core > sql_alchemy_conn: `{parsed.scheme}`. " 

425 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " 

426 f"change to `{good_scheme}` before the next Airflow release.", 

427 FutureWarning, 

428 ) 

429 self.upgraded_values[(section, key)] = old_value 

430 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) 

431 self._update_env_var(section=section, name=key, new_value=new_value) 

432 

433 # if the old value is set via env var, we need to wipe it 

434 # otherwise, it'll "win" over our adjusted value 

435 old_env_var = self._env_var_name("core", key) 

436 os.environ.pop(old_env_var, None) 

437 

438 def _validate_enums(self): 

439 """Validate that enum type config has an accepted value.""" 

440 for (section_key, option_key), enum_options in self.enums_options.items(): 

441 if self.has_option(section_key, option_key): 

442 value = self.get(section_key, option_key) 

443 if value not in enum_options: 

444 raise AirflowConfigException( 

445 f"`[{section_key}] {option_key}` should not be " 

446 f"{value!r}. Possible values: {', '.join(enum_options)}." 

447 ) 

448 

449 def _validate_sqlite3_version(self): 

450 """Validate SQLite version. 

451 

452 Some features in storing rendered fields require SQLite >= 3.15.0. 

453 """ 

454 if "sqlite" not in self.get("database", "sql_alchemy_conn"): 

455 return 

456 

457 import sqlite3 

458 

459 min_sqlite_version = (3, 15, 0) 

460 if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version: 

461 return 

462 

463 from airflow.utils.docs import get_docs_url 

464 

465 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) 

466 raise AirflowConfigException( 

467 f"error: SQLite C library too old (< {min_sqlite_version_str}). " 

468 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" 

469 ) 

470 

471 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

472 return old.search(current_value) is not None 

473 

474 def _update_env_var(self, section: str, name: str, new_value: str): 

475 env_var = self._env_var_name(section, name) 

476 # Set it as an env var so that any subprocesses keep the same override! 

477 os.environ[env_var] = new_value 

478 

479 @staticmethod 

480 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any, version: str): 

481 warnings.warn( 

482 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

483 f"This value has been changed to {new_value!r} in the running config, but " 

484 f"please update your config before Apache Airflow {version}.", 

485 FutureWarning, 

486 ) 

487 

488 def _env_var_name(self, section: str, key: str) -> str: 

489 return f"{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}" 

490 

491 def _get_env_var_option(self, section: str, key: str): 

492 # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore) 

493 env_var = self._env_var_name(section, key) 

494 if env_var in os.environ: 

495 return expand_env_var(os.environ[env_var]) 

496 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

497 env_var_cmd = env_var + "_CMD" 

498 if env_var_cmd in os.environ: 

499 # if this is a valid command key... 

500 if (section, key) in self.sensitive_config_values: 

501 return run_command(os.environ[env_var_cmd]) 

502 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

503 env_var_secret_path = env_var + "_SECRET" 

504 if env_var_secret_path in os.environ: 

505 # if this is a valid secret path... 

506 if (section, key) in self.sensitive_config_values: 

507 return _get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

508 return None 

509 

510 def _get_cmd_option(self, section: str, key: str): 

511 fallback_key = key + "_cmd" 

512 if (section, key) in self.sensitive_config_values: 

513 if super().has_option(section, fallback_key): 

514 command = super().get(section, fallback_key) 

515 return run_command(command) 

516 return None 

517 

518 def _get_cmd_option_from_config_sources( 

519 self, config_sources: ConfigSourcesType, section: str, key: str 

520 ) -> str | None: 

521 fallback_key = key + "_cmd" 

522 if (section, key) in self.sensitive_config_values: 

523 section_dict = config_sources.get(section) 

524 if section_dict is not None: 

525 command_value = section_dict.get(fallback_key) 

526 if command_value is not None: 

527 if isinstance(command_value, str): 

528 command = command_value 

529 else: 

530 command = command_value[0] 

531 return run_command(command) 

532 return None 

533 

534 def _get_secret_option(self, section: str, key: str) -> str | None: 

535 """Get Config option values from Secret Backend.""" 

536 fallback_key = key + "_secret" 

537 if (section, key) in self.sensitive_config_values: 

538 if super().has_option(section, fallback_key): 

539 secrets_path = super().get(section, fallback_key) 

540 return _get_config_value_from_secret_backend(secrets_path) 

541 return None 

542 

543 def _get_secret_option_from_config_sources( 

544 self, config_sources: ConfigSourcesType, section: str, key: str 

545 ) -> str | None: 

546 fallback_key = key + "_secret" 

547 if (section, key) in self.sensitive_config_values: 

548 section_dict = config_sources.get(section) 

549 if section_dict is not None: 

550 secrets_path_value = section_dict.get(fallback_key) 

551 if secrets_path_value is not None: 

552 if isinstance(secrets_path_value, str): 

553 secrets_path = secrets_path_value 

554 else: 

555 secrets_path = secrets_path_value[0] 

556 return _get_config_value_from_secret_backend(secrets_path) 

557 return None 

558 

559 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

560 value = self.get(section, key, _extra_stacklevel=1, **kwargs) 

561 if value is None: 

562 raise ValueError(f"The value {section}/{key} should be set!") 

563 return value 

564 

565 @overload # type: ignore[override] 

566 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override] 

567 ... 

568 

569 @overload # type: ignore[override] 

570 def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[override] 

571 ... 

572 

573 def get( # type: ignore[override, misc] 

574 self, 

575 section: str, 

576 key: str, 

577 _extra_stacklevel: int = 0, 

578 **kwargs, 

579 ) -> str | None: 

580 section = str(section).lower() 

581 key = str(key).lower() 

582 warning_emitted = False 

583 deprecated_section: str | None 

584 deprecated_key: str | None 

585 

586 # For when we rename whole sections 

587 if section in self.inversed_deprecated_sections: 

588 deprecated_section, deprecated_key = (section, key) 

589 section = self.inversed_deprecated_sections[section] 

590 if not self._suppress_future_warnings: 

591 warnings.warn( 

592 f"The config section [{deprecated_section}] has been renamed to " 

593 f"[{section}]. Please update your `conf.get*` call to use the new name", 

594 FutureWarning, 

595 stacklevel=2 + _extra_stacklevel, 

596 ) 

597 # Don't warn about individual rename if the whole section is renamed 

598 warning_emitted = True 

599 elif (section, key) in self.inversed_deprecated_options: 

600 # Handle using deprecated section/key instead of the new section/key 

601 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

602 if not self._suppress_future_warnings and not warning_emitted: 

603 warnings.warn( 

604 f"section/key [{section}/{key}] has been deprecated, you should use" 

605 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

606 "new name", 

607 FutureWarning, 

608 stacklevel=2 + _extra_stacklevel, 

609 ) 

610 warning_emitted = True 

611 deprecated_section, deprecated_key = section, key 

612 section, key = (new_section, new_key) 

613 elif section in self.deprecated_sections: 

614 # When accessing the new section name, make sure we check under the old config name 

615 deprecated_key = key 

616 deprecated_section = self.deprecated_sections[section][0] 

617 else: 

618 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

619 (section, key), (None, None, None) 

620 ) 

621 

622 # first check environment variables 

623 option = self._get_environment_variables( 

624 deprecated_key, 

625 deprecated_section, 

626 key, 

627 section, 

628 issue_warning=not warning_emitted, 

629 extra_stacklevel=_extra_stacklevel, 

630 ) 

631 if option is not None: 

632 return option 

633 

634 # ...then the config file 

635 option = self._get_option_from_config_file( 

636 deprecated_key, 

637 deprecated_section, 

638 key, 

639 kwargs, 

640 section, 

641 issue_warning=not warning_emitted, 

642 extra_stacklevel=_extra_stacklevel, 

643 ) 

644 if option is not None: 

645 return option 

646 

647 # ...then commands 

648 option = self._get_option_from_commands( 

649 deprecated_key, 

650 deprecated_section, 

651 key, 

652 section, 

653 issue_warning=not warning_emitted, 

654 extra_stacklevel=_extra_stacklevel, 

655 ) 

656 if option is not None: 

657 return option 

658 

659 # ...then from secret backends 

660 option = self._get_option_from_secrets( 

661 deprecated_key, 

662 deprecated_section, 

663 key, 

664 section, 

665 issue_warning=not warning_emitted, 

666 extra_stacklevel=_extra_stacklevel, 

667 ) 

668 if option is not None: 

669 return option 

670 

671 # ...then the default config 

672 if self.airflow_defaults.has_option(section, key) or "fallback" in kwargs: 

673 return expand_env_var(self.airflow_defaults.get(section, key, **kwargs)) 

674 

675 log.warning("section/key [%s/%s] not found in config", section, key) 

676 

677 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

678 

679 def _get_option_from_secrets( 

680 self, 

681 deprecated_key: str | None, 

682 deprecated_section: str | None, 

683 key: str, 

684 section: str, 

685 issue_warning: bool = True, 

686 extra_stacklevel: int = 0, 

687 ) -> str | None: 

688 option = self._get_secret_option(section, key) 

689 if option: 

690 return option 

691 if deprecated_section and deprecated_key: 

692 with self.suppress_future_warnings(): 

693 option = self._get_secret_option(deprecated_section, deprecated_key) 

694 if option: 

695 if issue_warning: 

696 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

697 return option 

698 return None 

699 

700 def _get_option_from_commands( 

701 self, 

702 deprecated_key: str | None, 

703 deprecated_section: str | None, 

704 key: str, 

705 section: str, 

706 issue_warning: bool = True, 

707 extra_stacklevel: int = 0, 

708 ) -> str | None: 

709 option = self._get_cmd_option(section, key) 

710 if option: 

711 return option 

712 if deprecated_section and deprecated_key: 

713 with self.suppress_future_warnings(): 

714 option = self._get_cmd_option(deprecated_section, deprecated_key) 

715 if option: 

716 if issue_warning: 

717 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

718 return option 

719 return None 

720 

721 def _get_option_from_config_file( 

722 self, 

723 deprecated_key: str | None, 

724 deprecated_section: str | None, 

725 key: str, 

726 kwargs: dict[str, Any], 

727 section: str, 

728 issue_warning: bool = True, 

729 extra_stacklevel: int = 0, 

730 ) -> str | None: 

731 if super().has_option(section, key): 

732 # Use the parent's methods to get the actual config here to be able to 

733 # separate the config from default config. 

734 return expand_env_var(super().get(section, key, **kwargs)) 

735 if deprecated_section and deprecated_key: 

736 if super().has_option(deprecated_section, deprecated_key): 

737 if issue_warning: 

738 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

739 with self.suppress_future_warnings(): 

740 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

741 return None 

742 

743 def _get_environment_variables( 

744 self, 

745 deprecated_key: str | None, 

746 deprecated_section: str | None, 

747 key: str, 

748 section: str, 

749 issue_warning: bool = True, 

750 extra_stacklevel: int = 0, 

751 ) -> str | None: 

752 option = self._get_env_var_option(section, key) 

753 if option is not None: 

754 return option 

755 if deprecated_section and deprecated_key: 

756 with self.suppress_future_warnings(): 

757 option = self._get_env_var_option(deprecated_section, deprecated_key) 

758 if option is not None: 

759 if issue_warning: 

760 self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) 

761 return option 

762 return None 

763 

764 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

765 val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() 

766 if "#" in val: 

767 val = val.split("#")[0].strip() 

768 if val in ("t", "true", "1"): 

769 return True 

770 elif val in ("f", "false", "0"): 

771 return False 

772 else: 

773 raise AirflowConfigException( 

774 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

775 f'Current value: "{val}".' 

776 ) 

777 

778 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

779 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

780 if val is None: 

781 raise AirflowConfigException( 

782 f"Failed to convert value None to int. " 

783 f'Please check "{key}" key in "{section}" section is set.' 

784 ) 

785 try: 

786 return int(val) 

787 except ValueError: 

788 raise AirflowConfigException( 

789 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

790 f'Current value: "{val}".' 

791 ) 

792 

793 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

794 val = self.get(section, key, _extra_stacklevel=1, **kwargs) 

795 if val is None: 

796 raise AirflowConfigException( 

797 f"Failed to convert value None to float. " 

798 f'Please check "{key}" key in "{section}" section is set.' 

799 ) 

800 try: 

801 return float(val) 

802 except ValueError: 

803 raise AirflowConfigException( 

804 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

805 f'Current value: "{val}".' 

806 ) 

807 

808 def getimport(self, section: str, key: str, **kwargs) -> Any: 

809 """ 

810 Reads options, imports the full qualified name, and returns the object. 

811 

812 In case of failure, it throws an exception with the key and section names 

813 

814 :return: The object or None, if the option is empty 

815 """ 

816 full_qualified_path = conf.get(section=section, key=key, **kwargs) 

817 if not full_qualified_path: 

818 return None 

819 

820 try: 

821 return import_string(full_qualified_path) 

822 except ImportError as e: 

823 log.error(e) 

824 raise AirflowConfigException( 

825 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

826 f'Current value: "{full_qualified_path}".' 

827 ) 

828 

829 def getjson( 

830 self, section: str, key: str, fallback=_UNSET, **kwargs 

831 ) -> dict | list | str | int | float | None: 

832 """ 

833 Return a config value parsed from a JSON string. 

834 

835 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

836 """ 

837 # get always returns the fallback value as a string, so for this if 

838 # someone gives us an object we want to keep that 

839 default = _UNSET 

840 if fallback is not _UNSET: 

841 default = fallback 

842 fallback = _UNSET 

843 

844 try: 

845 data = self.get(section=section, key=key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

846 except (NoSectionError, NoOptionError): 

847 return default 

848 

849 if not data: 

850 return default if default is not _UNSET else None 

851 

852 try: 

853 return json.loads(data) 

854 except JSONDecodeError as e: 

855 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

856 

857 def gettimedelta( 

858 self, section: str, key: str, fallback: Any = None, **kwargs 

859 ) -> datetime.timedelta | None: 

860 """ 

861 Gets the config value for the given section and key, and converts it into datetime.timedelta object. 

862 

863 If the key is missing, then it is considered as `None`. 

864 

865 :param section: the section from the config 

866 :param key: the key defined in the given section 

867 :param fallback: fallback value when no config value is given, defaults to None 

868 :raises AirflowConfigException: raised because ValueError or OverflowError 

869 :return: datetime.timedelta(seconds=<config_value>) or None 

870 """ 

871 val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) 

872 

873 if val: 

874 # the given value must be convertible to integer 

875 try: 

876 int_val = int(val) 

877 except ValueError: 

878 raise AirflowConfigException( 

879 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

880 f'Current value: "{val}".' 

881 ) 

882 

883 try: 

884 return datetime.timedelta(seconds=int_val) 

885 except OverflowError as err: 

886 raise AirflowConfigException( 

887 f"Failed to convert value to timedelta in `seconds`. " 

888 f"{err}. " 

889 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

890 ) 

891 

892 return fallback 

893 

894 def read( 

895 self, 

896 filenames: (str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike]), 

897 encoding=None, 

898 ): 

899 super().read(filenames=filenames, encoding=encoding) 

900 

901 # The RawConfigParser defines "Mapping" from abc.collections is not subscriptable - so we have 

902 # to use Dict here. 

903 def read_dict( # type: ignore[override] 

904 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

905 ): 

906 super().read_dict(dictionary=dictionary, source=source) 

907 

908 def has_option(self, section: str, option: str) -> bool: 

909 try: 

910 # Using self.get() to avoid reimplementing the priority order 

911 # of config variables (env, config, cmd, defaults) 

912 # UNSET to avoid logging a warning about missing values 

913 self.get(section, option, fallback=_UNSET, _extra_stacklevel=1) 

914 return True 

915 except (NoOptionError, NoSectionError): 

916 return False 

917 

918 def remove_option(self, section: str, option: str, remove_default: bool = True): 

919 """ 

920 Remove an option if it exists in config from a file or default config. 

921 

922 If both of config have the same option, this removes the option 

923 in both configs unless remove_default=False. 

924 """ 

925 if super().has_option(section, option): 

926 super().remove_option(section, option) 

927 

928 if self.airflow_defaults.has_option(section, option) and remove_default: 

929 self.airflow_defaults.remove_option(section, option) 

930 

931 def getsection(self, section: str) -> ConfigOptionsDictType | None: 

932 """ 

933 Returns the section as a dict. 

934 

935 Values are converted to int, float, bool as required. 

936 

937 :param section: section from the config 

938 """ 

939 if not self.has_section(section) and not self.airflow_defaults.has_section(section): 

940 return None 

941 if self.airflow_defaults.has_section(section): 

942 _section: ConfigOptionsDictType = OrderedDict(self.airflow_defaults.items(section)) 

943 else: 

944 _section = OrderedDict() 

945 

946 if self.has_section(section): 

947 _section.update(OrderedDict(self.items(section))) 

948 

949 section_prefix = self._env_var_name(section, "") 

950 for env_var in sorted(os.environ.keys()): 

951 if env_var.startswith(section_prefix): 

952 key = env_var.replace(section_prefix, "") 

953 if key.endswith("_CMD"): 

954 key = key[:-4] 

955 key = key.lower() 

956 _section[key] = self._get_env_var_option(section, key) 

957 

958 for key, val in _section.items(): 

959 if val is None: 

960 raise AirflowConfigException( 

961 f"Failed to convert value automatically. " 

962 f'Please check "{key}" key in "{section}" section is set.' 

963 ) 

964 try: 

965 _section[key] = int(val) 

966 except ValueError: 

967 try: 

968 _section[key] = float(val) 

969 except ValueError: 

970 if isinstance(val, str) and val.lower() in ("t", "true"): 

971 _section[key] = True 

972 elif isinstance(val, str) and val.lower() in ("f", "false"): 

973 _section[key] = False 

974 return _section 

975 

976 def write( # type: ignore[override] 

977 self, fp: IO, space_around_delimiters: bool = True, section: str | None = None 

978 ) -> None: 

979 # This is based on the configparser.RawConfigParser.write method code to add support for 

980 # reading options from environment variables. 

981 # Various type ignores below deal with less-than-perfect RawConfigParser superclass typing 

982 if space_around_delimiters: 

983 delimiter = f" {self._delimiters[0]} " # type: ignore[attr-defined] 

984 else: 

985 delimiter = self._delimiters[0] # type: ignore[attr-defined] 

986 if self._defaults: # type: ignore 

987 self._write_section( # type: ignore[attr-defined] 

988 fp, self.default_section, self._defaults.items(), delimiter # type: ignore[attr-defined] 

989 ) 

990 sections = ( 

991 {section: dict(self.getsection(section))} # type: ignore[arg-type] 

992 if section 

993 else self._sections # type: ignore[attr-defined] 

994 ) 

995 for sect in sections: 

996 item_section: ConfigOptionsDictType = self.getsection(sect) # type: ignore[assignment] 

997 self._write_section(fp, sect, item_section.items(), delimiter) # type: ignore[attr-defined] 

998 

999 def as_dict( 

1000 self, 

1001 display_source: bool = False, 

1002 display_sensitive: bool = False, 

1003 raw: bool = False, 

1004 include_env: bool = True, 

1005 include_cmds: bool = True, 

1006 include_secret: bool = True, 

1007 ) -> ConfigSourcesType: 

1008 """ 

1009 Returns the current configuration as an OrderedDict of OrderedDicts. 

1010 

1011 When materializing current configuration Airflow defaults are 

1012 materialized along with user set configs. If any of the `include_*` 

1013 options are False then the result of calling command or secret key 

1014 configs do not override Airflow defaults and instead are passed through. 

1015 In order to then avoid Airflow defaults from overwriting user set 

1016 command or secret key configs we filter out bare sensitive_config_values 

1017 that are set to Airflow defaults when command or secret key configs 

1018 produce different values. 

1019 

1020 :param display_source: If False, the option value is returned. If True, 

1021 a tuple of (option_value, source) is returned. Source is either 

1022 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1023 :param display_sensitive: If True, the values of options set by env 

1024 vars and bash commands will be displayed. If False, those options 

1025 are shown as '< hidden >' 

1026 :param raw: Should the values be output as interpolated values, or the 

1027 "raw" form that can be fed back in to ConfigParser 

1028 :param include_env: Should the value of configuration from AIRFLOW__ 

1029 environment variables be included or not 

1030 :param include_cmds: Should the result of calling any *_cmd config be 

1031 set (True, default), or should the _cmd options be left as the 

1032 command to run (False) 

1033 :param include_secret: Should the result of calling any *_secret config be 

1034 set (True, default), or should the _secret options be left as the 

1035 path to get the secret from (False) 

1036 :return: Dictionary, where the key is the name of the section and the content is 

1037 the dictionary with the name of the parameter and its value. 

1038 """ 

1039 if not display_sensitive: 

1040 # We want to hide the sensitive values at the appropriate methods 

1041 # since envs from cmds, secrets can be read at _include_envs method 

1042 if not all([include_env, include_cmds, include_secret]): 

1043 raise ValueError( 

1044 "If display_sensitive is false, then include_env, " 

1045 "include_cmds, include_secret must all be set as True" 

1046 ) 

1047 

1048 config_sources: ConfigSourcesType = {} 

1049 configs = [ 

1050 ("default", self.airflow_defaults), 

1051 ("airflow.cfg", self), 

1052 ] 

1053 

1054 self._replace_config_with_display_sources( 

1055 config_sources, 

1056 configs, 

1057 display_source, 

1058 raw, 

1059 self.deprecated_options, 

1060 include_cmds=include_cmds, 

1061 include_env=include_env, 

1062 include_secret=include_secret, 

1063 ) 

1064 

1065 # add env vars and overwrite because they have priority 

1066 if include_env: 

1067 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1068 else: 

1069 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1070 

1071 # add bash commands 

1072 if include_cmds: 

1073 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1074 else: 

1075 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1076 

1077 # add config from secret backends 

1078 if include_secret: 

1079 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1080 else: 

1081 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1082 

1083 if not display_sensitive: 

1084 # This ensures the ones from config file is hidden too 

1085 # if they are not provided through env, cmd and secret 

1086 hidden = "< hidden >" 

1087 for section, key in self.sensitive_config_values: 

1088 if not config_sources.get(section): 

1089 continue 

1090 if config_sources[section].get(key, None): 

1091 if display_source: 

1092 source = config_sources[section][key][1] 

1093 config_sources[section][key] = (hidden, source) 

1094 else: 

1095 config_sources[section][key] = hidden 

1096 

1097 return config_sources 

1098 

1099 def _include_secrets( 

1100 self, 

1101 config_sources: ConfigSourcesType, 

1102 display_sensitive: bool, 

1103 display_source: bool, 

1104 raw: bool, 

1105 ): 

1106 for section, key in self.sensitive_config_values: 

1107 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

1108 if value: 

1109 if not display_sensitive: 

1110 value = "< hidden >" 

1111 if display_source: 

1112 opt: str | tuple[str, str] = (value, "secret") 

1113 elif raw: 

1114 opt = value.replace("%", "%%") 

1115 else: 

1116 opt = value 

1117 config_sources.setdefault(section, OrderedDict()).update({key: opt}) 

1118 del config_sources[section][key + "_secret"] 

1119 

1120 def _include_commands( 

1121 self, 

1122 config_sources: ConfigSourcesType, 

1123 display_sensitive: bool, 

1124 display_source: bool, 

1125 raw: bool, 

1126 ): 

1127 for section, key in self.sensitive_config_values: 

1128 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

1129 if not opt: 

1130 continue 

1131 opt_to_set: str | tuple[str, str] | None = opt 

1132 if not display_sensitive: 

1133 opt_to_set = "< hidden >" 

1134 if display_source: 

1135 opt_to_set = (str(opt_to_set), "cmd") 

1136 elif raw: 

1137 opt_to_set = str(opt_to_set).replace("%", "%%") 

1138 if opt_to_set is not None: 

1139 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

1140 config_sources.setdefault(section, OrderedDict()).update(dict_to_update) 

1141 del config_sources[section][key + "_cmd"] 

1142 

1143 def _include_envs( 

1144 self, 

1145 config_sources: ConfigSourcesType, 

1146 display_sensitive: bool, 

1147 display_source: bool, 

1148 raw: bool, 

1149 ): 

1150 for env_var in [ 

1151 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

1152 ]: 

1153 try: 

1154 _, section, key = env_var.split("__", 2) 

1155 opt = self._get_env_var_option(section, key) 

1156 except ValueError: 

1157 continue 

1158 if opt is None: 

1159 log.warning("Ignoring unknown env var '%s'", env_var) 

1160 continue 

1161 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

1162 # Don't hide cmd/secret values here 

1163 if not env_var.lower().endswith("cmd") and not env_var.lower().endswith("secret"): 

1164 if (section, key) in self.sensitive_config_values: 

1165 opt = "< hidden >" 

1166 elif raw: 

1167 opt = opt.replace("%", "%%") 

1168 if display_source: 

1169 opt = (opt, "env var") 

1170 

1171 section = section.lower() 

1172 # if we lower key for kubernetes_environment_variables section, 

1173 # then we won't be able to set any Airflow environment 

1174 # variables. Airflow only parse environment variables starts 

1175 # with AIRFLOW_. Therefore, we need to make it a special case. 

1176 if section != "kubernetes_environment_variables": 

1177 key = key.lower() 

1178 config_sources.setdefault(section, OrderedDict()).update({key: opt}) 

1179 

1180 def _filter_by_source( 

1181 self, 

1182 config_sources: ConfigSourcesType, 

1183 display_source: bool, 

1184 getter_func, 

1185 ): 

1186 """ 

1187 Deletes default configs from current configuration. 

1188 

1189 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

1190 

1191 This is necessary because bare configs take precedence over the command 

1192 or secret key equivalents so if the current running config is 

1193 materialized with Airflow defaults they in turn override user set 

1194 command or secret key configs. 

1195 

1196 :param config_sources: The current configuration to operate on 

1197 :param display_source: If False, configuration options contain raw 

1198 values. If True, options are a tuple of (option_value, source). 

1199 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1200 :param getter_func: A callback function that gets the user configured 

1201 override value for a particular sensitive_config_values config. 

1202 :return: None, the given config_sources is filtered if necessary, 

1203 otherwise untouched. 

1204 """ 

1205 for section, key in self.sensitive_config_values: 

1206 # Don't bother if we don't have section / key 

1207 if section not in config_sources or key not in config_sources[section]: 

1208 continue 

1209 # Check that there is something to override defaults 

1210 try: 

1211 getter_opt = getter_func(section, key) 

1212 except ValueError: 

1213 continue 

1214 if not getter_opt: 

1215 continue 

1216 # Check to see that there is a default value 

1217 if not self.airflow_defaults.has_option(section, key): 

1218 continue 

1219 # Check to see if bare setting is the same as defaults 

1220 if display_source: 

1221 # when display_source = true, we know that the config_sources contains tuple 

1222 opt, source = config_sources[section][key] # type: ignore 

1223 else: 

1224 opt = config_sources[section][key] 

1225 if opt == self.airflow_defaults.get(section, key): 

1226 del config_sources[section][key] 

1227 

1228 @staticmethod 

1229 def _replace_config_with_display_sources( 

1230 config_sources: ConfigSourcesType, 

1231 configs: Iterable[tuple[str, ConfigParser]], 

1232 display_source: bool, 

1233 raw: bool, 

1234 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

1235 include_env: bool, 

1236 include_cmds: bool, 

1237 include_secret: bool, 

1238 ): 

1239 for source_name, config in configs: 

1240 for section in config.sections(): 

1241 AirflowConfigParser._replace_section_config_with_display_sources( 

1242 config, 

1243 config_sources, 

1244 display_source, 

1245 raw, 

1246 section, 

1247 source_name, 

1248 deprecated_options, 

1249 configs, 

1250 include_env=include_env, 

1251 include_cmds=include_cmds, 

1252 include_secret=include_secret, 

1253 ) 

1254 

1255 @staticmethod 

1256 def _deprecated_value_is_set_in_config( 

1257 deprecated_section: str, 

1258 deprecated_key: str, 

1259 configs: Iterable[tuple[str, ConfigParser]], 

1260 ) -> bool: 

1261 for config_type, config in configs: 

1262 if config_type == "default": 

1263 continue 

1264 try: 

1265 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

1266 for key_candidate, _ in deprecated_section_array: 

1267 if key_candidate == deprecated_key: 

1268 return True 

1269 except NoSectionError: 

1270 pass 

1271 return False 

1272 

1273 @staticmethod 

1274 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

1275 return ( 

1276 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

1277 is not None 

1278 ) 

1279 

1280 @staticmethod 

1281 def _deprecated_command_is_set_in_config( 

1282 deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]] 

1283 ) -> bool: 

1284 return AirflowConfigParser._deprecated_value_is_set_in_config( 

1285 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

1286 ) 

1287 

1288 @staticmethod 

1289 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

1290 return ( 

1291 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

1292 is not None 

1293 ) 

1294 

1295 @staticmethod 

1296 def _deprecated_secret_is_set_in_config( 

1297 deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]] 

1298 ) -> bool: 

1299 return AirflowConfigParser._deprecated_value_is_set_in_config( 

1300 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

1301 ) 

1302 

1303 @staticmethod 

1304 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

1305 return ( 

1306 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

1307 is not None 

1308 ) 

1309 

1310 @contextmanager 

1311 def suppress_future_warnings(self): 

1312 suppress_future_warnings = self._suppress_future_warnings 

1313 self._suppress_future_warnings = True 

1314 yield self 

1315 self._suppress_future_warnings = suppress_future_warnings 

1316 

1317 @staticmethod 

1318 def _replace_section_config_with_display_sources( 

1319 config: ConfigParser, 

1320 config_sources: ConfigSourcesType, 

1321 display_source: bool, 

1322 raw: bool, 

1323 section: str, 

1324 source_name: str, 

1325 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

1326 configs: Iterable[tuple[str, ConfigParser]], 

1327 include_env: bool, 

1328 include_cmds: bool, 

1329 include_secret: bool, 

1330 ): 

1331 sect = config_sources.setdefault(section, OrderedDict()) 

1332 if isinstance(config, AirflowConfigParser): 

1333 with config.suppress_future_warnings(): 

1334 items = config.items(section=section, raw=raw) 

1335 else: 

1336 items = config.items(section=section, raw=raw) 

1337 for k, val in items: 

1338 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

1339 if deprecated_section and deprecated_key: 

1340 if source_name == "default": 

1341 # If deprecated entry has some non-default value set for any of the sources requested, 

1342 # We should NOT set default for the new entry (because it will override anything 

1343 # coming from the deprecated ones) 

1344 if AirflowConfigParser._deprecated_value_is_set_in_config( 

1345 deprecated_section, deprecated_key, configs 

1346 ): 

1347 continue 

1348 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

1349 deprecated_section, deprecated_key 

1350 ): 

1351 continue 

1352 if include_cmds and ( 

1353 AirflowConfigParser._deprecated_variable_command_is_set( 

1354 deprecated_section, deprecated_key 

1355 ) 

1356 or AirflowConfigParser._deprecated_command_is_set_in_config( 

1357 deprecated_section, deprecated_key, configs 

1358 ) 

1359 ): 

1360 continue 

1361 if include_secret and ( 

1362 AirflowConfigParser._deprecated_variable_secret_is_set( 

1363 deprecated_section, deprecated_key 

1364 ) 

1365 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

1366 deprecated_section, deprecated_key, configs 

1367 ) 

1368 ): 

1369 continue 

1370 if display_source: 

1371 sect[k] = (val, source_name) 

1372 else: 

1373 sect[k] = val 

1374 

1375 def load_test_config(self): 

1376 """ 

1377 Load the unit test configuration. 

1378 

1379 Note: this is not reversible. 

1380 """ 

1381 # remove all sections, falling back to defaults 

1382 for section in self.sections(): 

1383 self.remove_section(section) 

1384 

1385 # then read test config 

1386 

1387 path = _default_config_file_path("default_test.cfg") 

1388 log.info("Reading default test configuration from %s", path) 

1389 self.read_string(_parameterized_config_from_template("default_test.cfg")) 

1390 # then read any "custom" test settings 

1391 log.info("Reading test configuration from %s", TEST_CONFIG_FILE) 

1392 self.read(TEST_CONFIG_FILE) 

1393 

1394 @staticmethod 

1395 def _warn_deprecate( 

1396 section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int 

1397 ): 

1398 if section == deprecated_section: 

1399 warnings.warn( 

1400 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

1401 f"the old setting has been used, but please update your config.", 

1402 DeprecationWarning, 

1403 stacklevel=4 + extra_stacklevel, 

1404 ) 

1405 else: 

1406 warnings.warn( 

1407 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

1408 f"in [{section}] - the old setting has been used, but please update your config.", 

1409 DeprecationWarning, 

1410 stacklevel=4 + extra_stacklevel, 

1411 ) 

1412 

1413 def __getstate__(self): 

1414 return { 

1415 name: getattr(self, name) 

1416 for name in [ 

1417 "_sections", 

1418 "is_validated", 

1419 "airflow_defaults", 

1420 ] 

1421 } 

1422 

1423 def __setstate__(self, state): 

1424 self.__init__() 

1425 config = state.pop("_sections") 

1426 self.read_dict(config) 

1427 self.__dict__.update(state) 

1428 

1429 

1430def get_airflow_home() -> str: 

1431 """Get path to Airflow Home.""" 

1432 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) 

1433 

1434 

1435def get_airflow_config(airflow_home) -> str: 

1436 """Get Path to airflow.cfg path.""" 

1437 airflow_config_var = os.environ.get("AIRFLOW_CONFIG") 

1438 if airflow_config_var is None: 

1439 return os.path.join(airflow_home, "airflow.cfg") 

1440 return expand_env_var(airflow_config_var) 

1441 

1442 

1443def _parameterized_config_from_template(filename) -> str: 

1444 TEMPLATE_START = "# ----------------------- TEMPLATE BEGINS HERE -----------------------\n" 

1445 

1446 path = _default_config_file_path(filename) 

1447 with open(path) as fh: 

1448 for line in fh: 

1449 if line != TEMPLATE_START: 

1450 continue 

1451 return parameterized_config(fh.read().strip()) 

1452 raise RuntimeError(f"Template marker not found in {path!r}") 

1453 

1454 

1455def parameterized_config(template) -> str: 

1456 """ 

1457 Generates configuration from provided template & variables defined in current scope. 

1458 

1459 :param template: a config content templated with {{variables}} 

1460 """ 

1461 all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()} 

1462 return template.format(**all_vars) 

1463 

1464 

1465def get_airflow_test_config(airflow_home) -> str: 

1466 """Get path to unittests.cfg.""" 

1467 if "AIRFLOW_TEST_CONFIG" not in os.environ: 

1468 return os.path.join(airflow_home, "unittests.cfg") 

1469 # It will never return None 

1470 return expand_env_var(os.environ["AIRFLOW_TEST_CONFIG"]) # type: ignore[return-value] 

1471 

1472 

1473def _generate_fernet_key() -> str: 

1474 from cryptography.fernet import Fernet 

1475 

1476 return Fernet.generate_key().decode() 

1477 

1478 

1479def initialize_config() -> AirflowConfigParser: 

1480 """ 

1481 Load the Airflow config files. 

1482 

1483 Called for you automatically as part of the Airflow boot process. 

1484 """ 

1485 global FERNET_KEY, AIRFLOW_HOME, WEBSERVER_CONFIG 

1486 

1487 default_config = _parameterized_config_from_template("default_airflow.cfg") 

1488 

1489 local_conf = AirflowConfigParser(default_config=default_config) 

1490 

1491 if local_conf.getboolean("core", "unit_test_mode"): 

1492 # Load test config only 

1493 if not os.path.isfile(TEST_CONFIG_FILE): 

1494 from cryptography.fernet import Fernet 

1495 

1496 log.info("Creating new Airflow config file for unit tests in: %s", TEST_CONFIG_FILE) 

1497 pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True) 

1498 

1499 FERNET_KEY = Fernet.generate_key().decode() 

1500 

1501 with open(TEST_CONFIG_FILE, "w") as file: 

1502 cfg = _parameterized_config_from_template("default_test.cfg") 

1503 file.write(cfg) 

1504 make_group_other_inaccessible(TEST_CONFIG_FILE) 

1505 

1506 local_conf.load_test_config() 

1507 else: 

1508 # Load normal config 

1509 if not os.path.isfile(AIRFLOW_CONFIG): 

1510 from cryptography.fernet import Fernet 

1511 

1512 log.info("Creating new Airflow config file in: %s", AIRFLOW_CONFIG) 

1513 pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True) 

1514 

1515 FERNET_KEY = Fernet.generate_key().decode() 

1516 

1517 with open(AIRFLOW_CONFIG, "w") as file: 

1518 file.write(default_config) 

1519 make_group_other_inaccessible(AIRFLOW_CONFIG) 

1520 

1521 log.info("Reading the config from %s", AIRFLOW_CONFIG) 

1522 

1523 local_conf.read(AIRFLOW_CONFIG) 

1524 

1525 if local_conf.has_option("core", "AIRFLOW_HOME"): 

1526 msg = ( 

1527 "Specifying both AIRFLOW_HOME environment variable and airflow_home " 

1528 "in the config file is deprecated. Please use only the AIRFLOW_HOME " 

1529 "environment variable and remove the config file entry." 

1530 ) 

1531 if "AIRFLOW_HOME" in os.environ: 

1532 warnings.warn(msg, category=DeprecationWarning) 

1533 elif local_conf.get("core", "airflow_home") == AIRFLOW_HOME: 

1534 warnings.warn( 

1535 "Specifying airflow_home in the config file is deprecated. As you " 

1536 "have left it at the default value you should remove the setting " 

1537 "from your airflow.cfg and suffer no change in behaviour.", 

1538 category=DeprecationWarning, 

1539 ) 

1540 else: 

1541 # there 

1542 AIRFLOW_HOME = local_conf.get("core", "airflow_home") # type: ignore[assignment] 

1543 warnings.warn(msg, category=DeprecationWarning) 

1544 

1545 # They _might_ have set unit_test_mode in the airflow.cfg, we still 

1546 # want to respect that and then load the unittests.cfg 

1547 if local_conf.getboolean("core", "unit_test_mode"): 

1548 local_conf.load_test_config() 

1549 

1550 WEBSERVER_CONFIG = local_conf.get("webserver", "config_file") 

1551 if not os.path.isfile(WEBSERVER_CONFIG): 

1552 import shutil 

1553 

1554 log.info("Creating new FAB webserver config file in: %s", WEBSERVER_CONFIG) 

1555 shutil.copy(_default_config_file_path("default_webserver_config.py"), WEBSERVER_CONFIG) 

1556 return local_conf 

1557 

1558 

1559def make_group_other_inaccessible(file_path: str): 

1560 try: 

1561 permissions = os.stat(file_path) 

1562 os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR)) 

1563 except Exception as e: 

1564 log.warning( 

1565 "Could not change permissions of config file to be group/other inaccessible. " 

1566 "Continuing with original permissions:", 

1567 e, 

1568 ) 

1569 

1570 

1571# Historical convenience functions to access config entries 

1572def load_test_config(): 

1573 """Historical load_test_config.""" 

1574 warnings.warn( 

1575 "Accessing configuration method 'load_test_config' directly from the configuration module is " 

1576 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1577 "'conf.load_test_config'", 

1578 DeprecationWarning, 

1579 stacklevel=2, 

1580 ) 

1581 conf.load_test_config() 

1582 

1583 

1584def get(*args, **kwargs) -> ConfigType | None: 

1585 """Historical get.""" 

1586 warnings.warn( 

1587 "Accessing configuration method 'get' directly from the configuration module is " 

1588 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1589 "'conf.get'", 

1590 DeprecationWarning, 

1591 stacklevel=2, 

1592 ) 

1593 return conf.get(*args, **kwargs) 

1594 

1595 

1596def getboolean(*args, **kwargs) -> bool: 

1597 """Historical getboolean.""" 

1598 warnings.warn( 

1599 "Accessing configuration method 'getboolean' directly from the configuration module is " 

1600 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1601 "'conf.getboolean'", 

1602 DeprecationWarning, 

1603 stacklevel=2, 

1604 ) 

1605 return conf.getboolean(*args, **kwargs) 

1606 

1607 

1608def getfloat(*args, **kwargs) -> float: 

1609 """Historical getfloat.""" 

1610 warnings.warn( 

1611 "Accessing configuration method 'getfloat' directly from the configuration module is " 

1612 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1613 "'conf.getfloat'", 

1614 DeprecationWarning, 

1615 stacklevel=2, 

1616 ) 

1617 return conf.getfloat(*args, **kwargs) 

1618 

1619 

1620def getint(*args, **kwargs) -> int: 

1621 """Historical getint.""" 

1622 warnings.warn( 

1623 "Accessing configuration method 'getint' directly from the configuration module is " 

1624 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1625 "'conf.getint'", 

1626 DeprecationWarning, 

1627 stacklevel=2, 

1628 ) 

1629 return conf.getint(*args, **kwargs) 

1630 

1631 

1632def getsection(*args, **kwargs) -> ConfigOptionsDictType | None: 

1633 """Historical getsection.""" 

1634 warnings.warn( 

1635 "Accessing configuration method 'getsection' directly from the configuration module is " 

1636 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1637 "'conf.getsection'", 

1638 DeprecationWarning, 

1639 stacklevel=2, 

1640 ) 

1641 return conf.getsection(*args, **kwargs) 

1642 

1643 

1644def has_option(*args, **kwargs) -> bool: 

1645 """Historical has_option.""" 

1646 warnings.warn( 

1647 "Accessing configuration method 'has_option' directly from the configuration module is " 

1648 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1649 "'conf.has_option'", 

1650 DeprecationWarning, 

1651 stacklevel=2, 

1652 ) 

1653 return conf.has_option(*args, **kwargs) 

1654 

1655 

1656def remove_option(*args, **kwargs) -> bool: 

1657 """Historical remove_option.""" 

1658 warnings.warn( 

1659 "Accessing configuration method 'remove_option' directly from the configuration module is " 

1660 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1661 "'conf.remove_option'", 

1662 DeprecationWarning, 

1663 stacklevel=2, 

1664 ) 

1665 return conf.remove_option(*args, **kwargs) 

1666 

1667 

1668def as_dict(*args, **kwargs) -> ConfigSourcesType: 

1669 """Historical as_dict.""" 

1670 warnings.warn( 

1671 "Accessing configuration method 'as_dict' directly from the configuration module is " 

1672 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1673 "'conf.as_dict'", 

1674 DeprecationWarning, 

1675 stacklevel=2, 

1676 ) 

1677 return conf.as_dict(*args, **kwargs) 

1678 

1679 

1680def set(*args, **kwargs) -> None: 

1681 """Historical set.""" 

1682 warnings.warn( 

1683 "Accessing configuration method 'set' directly from the configuration module is " 

1684 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1685 "'conf.set'", 

1686 DeprecationWarning, 

1687 stacklevel=2, 

1688 ) 

1689 conf.set(*args, **kwargs) 

1690 

1691 

1692def ensure_secrets_loaded() -> list[BaseSecretsBackend]: 

1693 """ 

1694 Ensure that all secrets backends are loaded. 

1695 If the secrets_backend_list contains only 2 default backends, reload it. 

1696 """ 

1697 # Check if the secrets_backend_list contains only 2 default backends 

1698 if len(secrets_backend_list) == 2: 

1699 return initialize_secrets_backends() 

1700 return secrets_backend_list 

1701 

1702 

1703def get_custom_secret_backend() -> BaseSecretsBackend | None: 

1704 """Get Secret Backend if defined in airflow.cfg.""" 

1705 secrets_backend_cls = conf.getimport(section="secrets", key="backend") 

1706 

1707 if not secrets_backend_cls: 

1708 return None 

1709 

1710 try: 

1711 backend_kwargs = conf.getjson(section="secrets", key="backend_kwargs") 

1712 if not backend_kwargs: 

1713 backend_kwargs = {} 

1714 elif not isinstance(backend_kwargs, dict): 

1715 raise ValueError("not a dict") 

1716 except AirflowConfigException: 

1717 log.warning("Failed to parse [secrets] backend_kwargs as JSON, defaulting to no kwargs.") 

1718 backend_kwargs = {} 

1719 except ValueError: 

1720 log.warning("Failed to parse [secrets] backend_kwargs into a dict, defaulting to no kwargs.") 

1721 backend_kwargs = {} 

1722 

1723 return secrets_backend_cls(**backend_kwargs) 

1724 

1725 

1726def initialize_secrets_backends() -> list[BaseSecretsBackend]: 

1727 """ 

1728 Initialize secrets backend. 

1729 

1730 * import secrets backend classes 

1731 * instantiate them and return them in a list 

1732 """ 

1733 backend_list = [] 

1734 

1735 custom_secret_backend = get_custom_secret_backend() 

1736 

1737 if custom_secret_backend is not None: 

1738 backend_list.append(custom_secret_backend) 

1739 

1740 for class_name in DEFAULT_SECRETS_SEARCH_PATH: 

1741 secrets_backend_cls = import_string(class_name) 

1742 backend_list.append(secrets_backend_cls()) 

1743 

1744 return backend_list 

1745 

1746 

1747@functools.lru_cache(maxsize=None) 

1748def _DEFAULT_CONFIG() -> str: 

1749 path = _default_config_file_path("default_airflow.cfg") 

1750 with open(path) as fh: 

1751 return fh.read() 

1752 

1753 

1754@functools.lru_cache(maxsize=None) 

1755def _TEST_CONFIG() -> str: 

1756 path = _default_config_file_path("default_test.cfg") 

1757 with open(path) as fh: 

1758 return fh.read() 

1759 

1760 

1761_deprecated = { 

1762 "DEFAULT_CONFIG": _DEFAULT_CONFIG, 

1763 "TEST_CONFIG": _TEST_CONFIG, 

1764 "TEST_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_test.cfg"), 

1765 "DEFAULT_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_airflow.cfg"), 

1766} 

1767 

1768 

1769def __getattr__(name): 

1770 if name in _deprecated: 

1771 warnings.warn( 

1772 f"{__name__}.{name} is deprecated and will be removed in future", 

1773 DeprecationWarning, 

1774 stacklevel=2, 

1775 ) 

1776 return _deprecated[name]() 

1777 raise AttributeError(f"module {__name__} has no attribute {name}") 

1778 

1779 

1780# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using 

1781# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. 

1782AIRFLOW_HOME = get_airflow_home() 

1783AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) 

1784 

1785# Set up dags folder for unit tests 

1786# this directory won't exist if users install via pip 

1787_TEST_DAGS_FOLDER = os.path.join( 

1788 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" 

1789) 

1790if os.path.exists(_TEST_DAGS_FOLDER): 

1791 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER 

1792else: 

1793 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") 

1794 

1795# Set up plugins folder for unit tests 

1796_TEST_PLUGINS_FOLDER = os.path.join( 

1797 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" 

1798) 

1799if os.path.exists(_TEST_PLUGINS_FOLDER): 

1800 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER 

1801else: 

1802 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") 

1803 

1804TEST_CONFIG_FILE = get_airflow_test_config(AIRFLOW_HOME) 

1805 

1806SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

1807FERNET_KEY = "" # Set only if needed when generating a new file 

1808WEBSERVER_CONFIG = "" # Set by initialize_config 

1809 

1810conf = initialize_config() 

1811secrets_backend_list = initialize_secrets_backends() 

1812conf.validate()