Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/configuration.py: 48%

769 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import datetime 

20import functools 

21import json 

22import logging 

23import multiprocessing 

24import os 

25import pathlib 

26import re 

27import shlex 

28import subprocess 

29import sys 

30import warnings 

31from base64 import b64encode 

32from collections import OrderedDict 

33 

34# Ignored Mypy on configparser because it thinks the configparser module has no _UNSET attribute 

35from configparser import _UNSET, ConfigParser, NoOptionError, NoSectionError # type: ignore 

36from contextlib import contextmanager, suppress 

37from json.decoder import JSONDecodeError 

38from re import Pattern 

39from typing import IO, Any, Dict, Iterable, Tuple, Union 

40from urllib.parse import urlsplit 

41 

42from typing_extensions import overload 

43 

44from airflow.compat.functools import cached_property 

45from airflow.exceptions import AirflowConfigException 

46from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend 

47from airflow.utils import yaml 

48from airflow.utils.module_loading import import_string 

49from airflow.utils.weight_rule import WeightRule 

50 

51log = logging.getLogger(__name__) 

52 

53# show Airflow's deprecation warnings 

54if not sys.warnoptions: 

55 warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") 

56 warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") 

57 

58_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") 

59 

60ConfigType = Union[str, int, float, bool] 

61ConfigOptionsDictType = Dict[str, ConfigType] 

62ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]] 

63ConfigSourcesType = Dict[str, ConfigSectionSourcesType] 

64 

65ENV_VAR_PREFIX = "AIRFLOW__" 

66 

67 

68def _parse_sqlite_version(s: str) -> tuple[int, ...]: 

69 match = _SQLITE3_VERSION_PATTERN.match(s) 

70 if match is None: 

71 return () 

72 return tuple(int(p) for p in match.group("version").split(".")) 

73 

74 

75@overload 

76def expand_env_var(env_var: None) -> None: 

77 ... 

78 

79 

80@overload 

81def expand_env_var(env_var: str) -> str: 

82 ... 

83 

84 

85def expand_env_var(env_var: str | None) -> str | None: 

86 """ 

87 Expands (potentially nested) env vars. 

88 

89 Repeat and apply `expandvars` and `expanduser` until 

90 interpolation stops having any effect. 

91 """ 

92 if not env_var: 

93 return env_var 

94 while True: 

95 interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) 

96 if interpolated == env_var: 

97 return interpolated 

98 else: 

99 env_var = interpolated 

100 

101 

102def run_command(command: str) -> str: 

103 """Runs command and returns stdout.""" 

104 process = subprocess.Popen( 

105 shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True 

106 ) 

107 output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) 

108 

109 if process.returncode != 0: 

110 raise AirflowConfigException( 

111 f"Cannot execute {command}. Error code is: {process.returncode}. " 

112 f"Output: {output}, Stderr: {stderr}" 

113 ) 

114 

115 return output 

116 

117 

118def _get_config_value_from_secret_backend(config_key: str) -> str | None: 

119 """Get Config option values from Secret Backend.""" 

120 try: 

121 secrets_client = get_custom_secret_backend() 

122 if not secrets_client: 

123 return None 

124 return secrets_client.get_config(config_key) 

125 except Exception as e: 

126 raise AirflowConfigException( 

127 "Cannot retrieve config from alternative secrets backend. " 

128 "Make sure it is configured properly and that the Backend " 

129 "is accessible.\n" 

130 f"{e}" 

131 ) 

132 

133 

134def _default_config_file_path(file_name: str) -> str: 

135 templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") 

136 return os.path.join(templates_dir, file_name) 

137 

138 

139def default_config_yaml() -> dict[str, Any]: 

140 """ 

141 Read Airflow configs from YAML file. 

142 

143 :return: Python dictionary containing configs & their info 

144 """ 

145 with open(_default_config_file_path("config.yml")) as config_file: 

146 return yaml.safe_load(config_file) 

147 

148 

149SENSITIVE_CONFIG_VALUES = { 

150 ("database", "sql_alchemy_conn"), 

151 ("core", "fernet_key"), 

152 ("celery", "broker_url"), 

153 ("celery", "flower_basic_auth"), 

154 ("celery", "result_backend"), 

155 ("atlas", "password"), 

156 ("smtp", "smtp_password"), 

157 ("webserver", "secret_key"), 

158 # The following options are deprecated 

159 ("core", "sql_alchemy_conn"), 

160} 

161 

162 

163class AirflowConfigParser(ConfigParser): 

164 """Custom Airflow Configparser supporting defaults and deprecated options.""" 

165 

166 # These configuration elements can be fetched as the stdout of commands 

167 # following the "{section}__{name}_cmd" pattern, the idea behind this 

168 # is to not store password on boxes in text files. 

169 # These configs can also be fetched from Secrets backend 

170 # following the "{section}__{name}__secret" pattern 

171 

172 sensitive_config_values: set[tuple[str, str]] = SENSITIVE_CONFIG_VALUES 

173 

174 # A mapping of (new section, new option) -> (old section, old option, since_version). 

175 # When reading new option, the old option will be checked to see if it exists. If it does a 

176 # DeprecationWarning will be issued and the old option will be used instead 

177 deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { 

178 ("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"), 

179 ("logging", "base_log_folder"): ("core", "base_log_folder", "2.0.0"), 

180 ("logging", "remote_logging"): ("core", "remote_logging", "2.0.0"), 

181 ("logging", "remote_log_conn_id"): ("core", "remote_log_conn_id", "2.0.0"), 

182 ("logging", "remote_base_log_folder"): ("core", "remote_base_log_folder", "2.0.0"), 

183 ("logging", "encrypt_s3_logs"): ("core", "encrypt_s3_logs", "2.0.0"), 

184 ("logging", "logging_level"): ("core", "logging_level", "2.0.0"), 

185 ("logging", "fab_logging_level"): ("core", "fab_logging_level", "2.0.0"), 

186 ("logging", "logging_config_class"): ("core", "logging_config_class", "2.0.0"), 

187 ("logging", "colored_console_log"): ("core", "colored_console_log", "2.0.0"), 

188 ("logging", "colored_log_format"): ("core", "colored_log_format", "2.0.0"), 

189 ("logging", "colored_formatter_class"): ("core", "colored_formatter_class", "2.0.0"), 

190 ("logging", "log_format"): ("core", "log_format", "2.0.0"), 

191 ("logging", "simple_log_format"): ("core", "simple_log_format", "2.0.0"), 

192 ("logging", "task_log_prefix_template"): ("core", "task_log_prefix_template", "2.0.0"), 

193 ("logging", "log_filename_template"): ("core", "log_filename_template", "2.0.0"), 

194 ("logging", "log_processor_filename_template"): ("core", "log_processor_filename_template", "2.0.0"), 

195 ("logging", "dag_processor_manager_log_location"): ( 

196 "core", 

197 "dag_processor_manager_log_location", 

198 "2.0.0", 

199 ), 

200 ("logging", "task_log_reader"): ("core", "task_log_reader", "2.0.0"), 

201 ("metrics", "statsd_on"): ("scheduler", "statsd_on", "2.0.0"), 

202 ("metrics", "statsd_host"): ("scheduler", "statsd_host", "2.0.0"), 

203 ("metrics", "statsd_port"): ("scheduler", "statsd_port", "2.0.0"), 

204 ("metrics", "statsd_prefix"): ("scheduler", "statsd_prefix", "2.0.0"), 

205 ("metrics", "statsd_allow_list"): ("scheduler", "statsd_allow_list", "2.0.0"), 

206 ("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"), 

207 ("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"), 

208 ("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"), 

209 ("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"), 

210 ("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"), 

211 ("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"), 

212 ("operators", "default_queue"): ("celery", "default_queue", "2.1.0"), 

213 ("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"), 

214 ("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"), 

215 ("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"), 

216 ("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"), 

217 ("logging", "worker_log_server_port"): ("celery", "worker_log_server_port", "2.2.0"), 

218 ("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"), 

219 ("api", "auth_backends"): ("api", "auth_backend", "2.3.0"), 

220 ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"), 

221 ("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"), 

222 ("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"), 

223 ("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"), 

224 ("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"), 

225 ("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"), 

226 ("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"), 

227 ("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"), 

228 ("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"), 

229 ("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"), 

230 ("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"), 

231 ("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"), 

232 ("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"), 

233 } 

234 

235 # A mapping of new section -> (old section, since_version). 

236 deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")} 

237 

238 # Now build the inverse so we can go from old_section/old_key to new_section/new_key 

239 # if someone tries to retrieve it based on old_section/old_key 

240 @cached_property 

241 def inversed_deprecated_options(self): 

242 return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} 

243 

244 @cached_property 

245 def inversed_deprecated_sections(self): 

246 return { 

247 old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() 

248 } 

249 

250 # A mapping of old default values that we want to change and warn the user 

251 # about. Mapping of section -> setting -> { old, replace, by_version } 

252 deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = { 

253 "core": { 

254 "hostname_callable": (re.compile(r":"), r".", "2.1"), 

255 }, 

256 "webserver": { 

257 "navbar_color": (re.compile(r"\A#007A87\Z", re.IGNORECASE), "#fff", "2.1"), 

258 "dag_default_view": (re.compile(r"^tree$"), "grid", "3.0"), 

259 }, 

260 "email": { 

261 "email_backend": ( 

262 re.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"), 

263 r"airflow.providers.sendgrid.utils.emailer.send_email", 

264 "2.1", 

265 ), 

266 }, 

267 "logging": { 

268 "log_filename_template": ( 

269 re.compile(re.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")), 

270 "XX-set-after-default-config-loaded-XX", 

271 "3.0", 

272 ), 

273 }, 

274 "api": { 

275 "auth_backends": ( 

276 re.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"), 

277 "airflow.api.auth.backend.session", 

278 "3.0", 

279 ), 

280 }, 

281 "elasticsearch": { 

282 "log_id_template": ( 

283 re.compile("^" + re.escape("{dag_id}-{task_id}-{execution_date}-{try_number}") + "$"), 

284 "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", 

285 "3.0", 

286 ) 

287 }, 

288 } 

289 

290 _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] 

291 enums_options = { 

292 ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), 

293 ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], 

294 ("core", "mp_start_method"): multiprocessing.get_all_start_methods(), 

295 ("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"], 

296 ("logging", "logging_level"): _available_logging_levels, 

297 ("logging", "fab_logging_level"): _available_logging_levels, 

298 # celery_logging_level can be empty, which uses logging_level as fallback 

299 ("logging", "celery_logging_level"): _available_logging_levels + [""], 

300 ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", ""], 

301 } 

302 

303 upgraded_values: dict[tuple[str, str], str] 

304 """Mapping of (section,option) to the old value that was upgraded""" 

305 

306 # This method transforms option names on every read, get, or set operation. 

307 # This changes from the default behaviour of ConfigParser from lower-casing 

308 # to instead be case-preserving 

309 def optionxform(self, optionstr: str) -> str: 

310 return optionstr 

311 

312 def __init__(self, default_config: str | None = None, *args, **kwargs): 

313 super().__init__(*args, **kwargs) 

314 self.upgraded_values = {} 

315 

316 self.airflow_defaults = ConfigParser(*args, **kwargs) 

317 if default_config is not None: 

318 self.airflow_defaults.read_string(default_config) 

319 # Set the upgrade value based on the current loaded default 

320 default = self.airflow_defaults.get("logging", "log_filename_template", fallback=None) 

321 if default: 

322 replacement = self.deprecated_values["logging"]["log_filename_template"] 

323 self.deprecated_values["logging"]["log_filename_template"] = ( 

324 replacement[0], 

325 default, 

326 replacement[2], 

327 ) 

328 else: 

329 # In case of tests it might not exist 

330 with suppress(KeyError): 

331 del self.deprecated_values["logging"]["log_filename_template"] 

332 else: 

333 with suppress(KeyError): 

334 del self.deprecated_values["logging"]["log_filename_template"] 

335 

336 self.is_validated = False 

337 self._suppress_future_warnings = False 

338 

339 def validate(self): 

340 self._validate_config_dependencies() 

341 self._validate_enums() 

342 

343 for section, replacement in self.deprecated_values.items(): 

344 for name, info in replacement.items(): 

345 old, new, version = info 

346 current_value = self.get(section, name, fallback="") 

347 if self._using_old_value(old, current_value): 

348 self.upgraded_values[(section, name)] = current_value 

349 new_value = old.sub(new, current_value) 

350 self._update_env_var(section=section, name=name, new_value=new_value) 

351 self._create_future_warning( 

352 name=name, 

353 section=section, 

354 current_value=current_value, 

355 new_value=new_value, 

356 version=version, 

357 ) 

358 

359 self._upgrade_auth_backends() 

360 self._upgrade_postgres_metastore_conn() 

361 self.is_validated = True 

362 

363 def _upgrade_auth_backends(self): 

364 """ 

365 Ensure a custom auth_backends setting contains session. 

366 

367 This is required by the UI for ajax queries. 

368 """ 

369 old_value = self.get("api", "auth_backends", fallback="") 

370 if old_value in ("airflow.api.auth.backend.default", ""): 

371 # handled by deprecated_values 

372 pass 

373 elif old_value.find("airflow.api.auth.backend.session") == -1: 

374 new_value = old_value + ",airflow.api.auth.backend.session" 

375 self._update_env_var(section="api", name="auth_backends", new_value=new_value) 

376 self.upgraded_values[("api", "auth_backends")] = old_value 

377 

378 # if the old value is set via env var, we need to wipe it 

379 # otherwise, it'll "win" over our adjusted value 

380 old_env_var = self._env_var_name("api", "auth_backend") 

381 os.environ.pop(old_env_var, None) 

382 

383 warnings.warn( 

384 "The auth_backends setting in [api] has had airflow.api.auth.backend.session added " 

385 "in the running config, which is needed by the UI. Please update your config before " 

386 "Apache Airflow 3.0.", 

387 FutureWarning, 

388 ) 

389 

390 def _upgrade_postgres_metastore_conn(self): 

391 """ 

392 Upgrade SQL schemas. 

393 

394 As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` 

395 must be replaced with `postgresql`. 

396 """ 

397 section, key = "database", "sql_alchemy_conn" 

398 old_value = self.get(section, key) 

399 bad_schemes = ["postgres+psycopg2", "postgres"] 

400 good_scheme = "postgresql" 

401 parsed = urlsplit(old_value) 

402 if parsed.scheme in bad_schemes: 

403 warnings.warn( 

404 f"Bad scheme in Airflow configuration core > sql_alchemy_conn: `{parsed.scheme}`. " 

405 "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " 

406 f"change to `{good_scheme}` before the next Airflow release.", 

407 FutureWarning, 

408 ) 

409 self.upgraded_values[(section, key)] = old_value 

410 new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) 

411 self._update_env_var(section=section, name=key, new_value=new_value) 

412 

413 # if the old value is set via env var, we need to wipe it 

414 # otherwise, it'll "win" over our adjusted value 

415 old_env_var = self._env_var_name("core", key) 

416 os.environ.pop(old_env_var, None) 

417 

418 def _validate_enums(self): 

419 """Validate that enum type config has an accepted value.""" 

420 for (section_key, option_key), enum_options in self.enums_options.items(): 

421 if self.has_option(section_key, option_key): 

422 value = self.get(section_key, option_key) 

423 if value not in enum_options: 

424 raise AirflowConfigException( 

425 f"`[{section_key}] {option_key}` should not be " 

426 f"{value!r}. Possible values: {', '.join(enum_options)}." 

427 ) 

428 

429 def _validate_config_dependencies(self): 

430 """ 

431 Validate that config based on condition. 

432 

433 Values are considered invalid when they conflict with other config values 

434 or system-level limitations and requirements. 

435 """ 

436 is_executor_without_sqlite_support = self.get("core", "executor") not in ( 

437 "DebugExecutor", 

438 "SequentialExecutor", 

439 ) 

440 is_sqlite = "sqlite" in self.get("database", "sql_alchemy_conn") 

441 if is_sqlite and is_executor_without_sqlite_support: 

442 raise AirflowConfigException(f"error: cannot use sqlite with the {self.get('core', 'executor')}") 

443 if is_sqlite: 

444 import sqlite3 

445 

446 from airflow.utils.docs import get_docs_url 

447 

448 # Some features in storing rendered fields require sqlite version >= 3.15.0 

449 min_sqlite_version = (3, 15, 0) 

450 if _parse_sqlite_version(sqlite3.sqlite_version) < min_sqlite_version: 

451 min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) 

452 raise AirflowConfigException( 

453 f"error: sqlite C library version too old (< {min_sqlite_version_str}). " 

454 f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" 

455 ) 

456 

457 def _using_old_value(self, old: Pattern, current_value: str) -> bool: 

458 return old.search(current_value) is not None 

459 

460 def _update_env_var(self, section: str, name: str, new_value: str): 

461 env_var = self._env_var_name(section, name) 

462 # Set it as an env var so that any subprocesses keep the same override! 

463 os.environ[env_var] = new_value 

464 

465 @staticmethod 

466 def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any, version: str): 

467 warnings.warn( 

468 f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " 

469 f"This value has been changed to {new_value!r} in the running config, but " 

470 f"please update your config before Apache Airflow {version}.", 

471 FutureWarning, 

472 ) 

473 

474 def _env_var_name(self, section: str, key: str) -> str: 

475 return f"{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}" 

476 

477 def _get_env_var_option(self, section: str, key: str): 

478 # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore) 

479 env_var = self._env_var_name(section, key) 

480 if env_var in os.environ: 

481 return expand_env_var(os.environ[env_var]) 

482 # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) 

483 env_var_cmd = env_var + "_CMD" 

484 if env_var_cmd in os.environ: 

485 # if this is a valid command key... 

486 if (section, key) in self.sensitive_config_values: 

487 return run_command(os.environ[env_var_cmd]) 

488 # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) 

489 env_var_secret_path = env_var + "_SECRET" 

490 if env_var_secret_path in os.environ: 

491 # if this is a valid secret path... 

492 if (section, key) in self.sensitive_config_values: 

493 return _get_config_value_from_secret_backend(os.environ[env_var_secret_path]) 

494 return None 

495 

496 def _get_cmd_option(self, section: str, key: str): 

497 fallback_key = key + "_cmd" 

498 if (section, key) in self.sensitive_config_values: 

499 if super().has_option(section, fallback_key): 

500 command = super().get(section, fallback_key) 

501 return run_command(command) 

502 return None 

503 

504 def _get_cmd_option_from_config_sources( 

505 self, config_sources: ConfigSourcesType, section: str, key: str 

506 ) -> str | None: 

507 fallback_key = key + "_cmd" 

508 if (section, key) in self.sensitive_config_values: 

509 section_dict = config_sources.get(section) 

510 if section_dict is not None: 

511 command_value = section_dict.get(fallback_key) 

512 if command_value is not None: 

513 if isinstance(command_value, str): 

514 command = command_value 

515 else: 

516 command = command_value[0] 

517 return run_command(command) 

518 return None 

519 

520 def _get_secret_option(self, section: str, key: str) -> str | None: 

521 """Get Config option values from Secret Backend.""" 

522 fallback_key = key + "_secret" 

523 if (section, key) in self.sensitive_config_values: 

524 if super().has_option(section, fallback_key): 

525 secrets_path = super().get(section, fallback_key) 

526 return _get_config_value_from_secret_backend(secrets_path) 

527 return None 

528 

529 def _get_secret_option_from_config_sources( 

530 self, config_sources: ConfigSourcesType, section: str, key: str 

531 ) -> str | None: 

532 fallback_key = key + "_secret" 

533 if (section, key) in self.sensitive_config_values: 

534 section_dict = config_sources.get(section) 

535 if section_dict is not None: 

536 secrets_path_value = section_dict.get(fallback_key) 

537 if secrets_path_value is not None: 

538 if isinstance(secrets_path_value, str): 

539 secrets_path = secrets_path_value 

540 else: 

541 secrets_path = secrets_path_value[0] 

542 return _get_config_value_from_secret_backend(secrets_path) 

543 return None 

544 

545 def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: 

546 value = self.get(section, key, **kwargs) 

547 if value is None: 

548 raise ValueError(f"The value {section}/{key} should be set!") 

549 return value 

550 

551 @overload # type: ignore[override] 

552 def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override] 

553 

554 ... 

555 

556 @overload # type: ignore[override] 

557 def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[override] 

558 

559 ... 

560 

561 def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[override, misc] 

562 section = str(section).lower() 

563 key = str(key).lower() 

564 warning_emitted = False 

565 deprecated_section: str | None 

566 deprecated_key: str | None 

567 

568 # For when we rename whole sections 

569 if section in self.inversed_deprecated_sections: 

570 deprecated_section, deprecated_key = (section, key) 

571 section = self.inversed_deprecated_sections[section] 

572 if not self._suppress_future_warnings: 

573 warnings.warn( 

574 f"The config section [{deprecated_section}] has been renamed to " 

575 f"[{section}]. Please update your `conf.get*` call to use the new name", 

576 FutureWarning, 

577 stacklevel=2, 

578 ) 

579 # Don't warn about individual rename if the whole section is renamed 

580 warning_emitted = True 

581 elif (section, key) in self.inversed_deprecated_options: 

582 # Handle using deprecated section/key instead of the new section/key 

583 new_section, new_key = self.inversed_deprecated_options[(section, key)] 

584 if not self._suppress_future_warnings and not warning_emitted: 

585 warnings.warn( 

586 f"section/key [{section}/{key}] has been deprecated, you should use" 

587 f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " 

588 "new name", 

589 FutureWarning, 

590 stacklevel=2, 

591 ) 

592 warning_emitted = True 

593 deprecated_section, deprecated_key = section, key 

594 section, key = (new_section, new_key) 

595 elif section in self.deprecated_sections: 

596 # When accessing the new section name, make sure we check under the old config name 

597 deprecated_key = key 

598 deprecated_section = self.deprecated_sections[section][0] 

599 else: 

600 deprecated_section, deprecated_key, _ = self.deprecated_options.get( 

601 (section, key), (None, None, None) 

602 ) 

603 

604 # first check environment variables 

605 option = self._get_environment_variables( 

606 deprecated_key, deprecated_section, key, section, issue_warning=not warning_emitted 

607 ) 

608 if option is not None: 

609 return option 

610 

611 # ...then the config file 

612 option = self._get_option_from_config_file( 

613 deprecated_key, deprecated_section, key, kwargs, section, issue_warning=not warning_emitted 

614 ) 

615 if option is not None: 

616 return option 

617 

618 # ...then commands 

619 option = self._get_option_from_commands( 

620 deprecated_key, deprecated_section, key, section, issue_warning=not warning_emitted 

621 ) 

622 if option is not None: 

623 return option 

624 

625 # ...then from secret backends 

626 option = self._get_option_from_secrets( 

627 deprecated_key, deprecated_section, key, section, issue_warning=not warning_emitted 

628 ) 

629 if option is not None: 

630 return option 

631 

632 # ...then the default config 

633 if self.airflow_defaults.has_option(section, key) or "fallback" in kwargs: 

634 return expand_env_var(self.airflow_defaults.get(section, key, **kwargs)) 

635 

636 log.warning("section/key [%s/%s] not found in config", section, key) 

637 

638 raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") 

639 

640 def _get_option_from_secrets( 

641 self, 

642 deprecated_key: str | None, 

643 deprecated_section: str | None, 

644 key: str, 

645 section: str, 

646 issue_warning: bool = True, 

647 ) -> str | None: 

648 option = self._get_secret_option(section, key) 

649 if option: 

650 return option 

651 if deprecated_section and deprecated_key: 

652 with self.suppress_future_warnings(): 

653 option = self._get_secret_option(deprecated_section, deprecated_key) 

654 if option: 

655 if issue_warning: 

656 self._warn_deprecate(section, key, deprecated_section, deprecated_key) 

657 return option 

658 return None 

659 

660 def _get_option_from_commands( 

661 self, 

662 deprecated_key: str | None, 

663 deprecated_section: str | None, 

664 key: str, 

665 section: str, 

666 issue_warning: bool = True, 

667 ) -> str | None: 

668 option = self._get_cmd_option(section, key) 

669 if option: 

670 return option 

671 if deprecated_section and deprecated_key: 

672 with self.suppress_future_warnings(): 

673 option = self._get_cmd_option(deprecated_section, deprecated_key) 

674 if option: 

675 if issue_warning: 

676 self._warn_deprecate(section, key, deprecated_section, deprecated_key) 

677 return option 

678 return None 

679 

680 def _get_option_from_config_file( 

681 self, 

682 deprecated_key: str | None, 

683 deprecated_section: str | None, 

684 key: str, 

685 kwargs: dict[str, Any], 

686 section: str, 

687 issue_warning: bool = True, 

688 ) -> str | None: 

689 if super().has_option(section, key): 

690 # Use the parent's methods to get the actual config here to be able to 

691 # separate the config from default config. 

692 return expand_env_var(super().get(section, key, **kwargs)) 

693 if deprecated_section and deprecated_key: 

694 if super().has_option(deprecated_section, deprecated_key): 

695 if issue_warning: 

696 self._warn_deprecate(section, key, deprecated_section, deprecated_key) 

697 with self.suppress_future_warnings(): 

698 return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) 

699 return None 

700 

701 def _get_environment_variables( 

702 self, 

703 deprecated_key: str | None, 

704 deprecated_section: str | None, 

705 key: str, 

706 section: str, 

707 issue_warning: bool = True, 

708 ) -> str | None: 

709 option = self._get_env_var_option(section, key) 

710 if option is not None: 

711 return option 

712 if deprecated_section and deprecated_key: 

713 with self.suppress_future_warnings(): 

714 option = self._get_env_var_option(deprecated_section, deprecated_key) 

715 if option is not None: 

716 if issue_warning: 

717 self._warn_deprecate(section, key, deprecated_section, deprecated_key) 

718 return option 

719 return None 

720 

721 def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] 

722 val = str(self.get(section, key, **kwargs)).lower().strip() 

723 if "#" in val: 

724 val = val.split("#")[0].strip() 

725 if val in ("t", "true", "1"): 

726 return True 

727 elif val in ("f", "false", "0"): 

728 return False 

729 else: 

730 raise AirflowConfigException( 

731 f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' 

732 f'Current value: "{val}".' 

733 ) 

734 

735 def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] 

736 val = self.get(section, key, **kwargs) 

737 if val is None: 

738 raise AirflowConfigException( 

739 f"Failed to convert value None to int. " 

740 f'Please check "{key}" key in "{section}" section is set.' 

741 ) 

742 try: 

743 return int(val) 

744 except ValueError: 

745 raise AirflowConfigException( 

746 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

747 f'Current value: "{val}".' 

748 ) 

749 

750 def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] 

751 val = self.get(section, key, **kwargs) 

752 if val is None: 

753 raise AirflowConfigException( 

754 f"Failed to convert value None to float. " 

755 f'Please check "{key}" key in "{section}" section is set.' 

756 ) 

757 try: 

758 return float(val) 

759 except ValueError: 

760 raise AirflowConfigException( 

761 f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' 

762 f'Current value: "{val}".' 

763 ) 

764 

765 def getimport(self, section: str, key: str, **kwargs) -> Any: 

766 """ 

767 Reads options, imports the full qualified name, and returns the object. 

768 

769 In case of failure, it throws an exception with the key and section names 

770 

771 :return: The object or None, if the option is empty 

772 """ 

773 full_qualified_path = conf.get(section=section, key=key, **kwargs) 

774 if not full_qualified_path: 

775 return None 

776 

777 try: 

778 return import_string(full_qualified_path) 

779 except ImportError as e: 

780 log.error(e) 

781 raise AirflowConfigException( 

782 f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' 

783 f'Current value: "{full_qualified_path}".' 

784 ) 

785 

786 def getjson( 

787 self, section: str, key: str, fallback=_UNSET, **kwargs 

788 ) -> dict | list | str | int | float | None: 

789 """ 

790 Return a config value parsed from a JSON string. 

791 

792 ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. 

793 """ 

794 # get always returns the fallback value as a string, so for this if 

795 # someone gives us an object we want to keep that 

796 default = _UNSET 

797 if fallback is not _UNSET: 

798 default = fallback 

799 fallback = _UNSET 

800 

801 try: 

802 data = self.get(section=section, key=key, fallback=fallback, **kwargs) 

803 except (NoSectionError, NoOptionError): 

804 return default 

805 

806 if not data: 

807 return default if default is not _UNSET else None 

808 

809 try: 

810 return json.loads(data) 

811 except JSONDecodeError as e: 

812 raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e 

813 

814 def gettimedelta( 

815 self, section: str, key: str, fallback: Any = None, **kwargs 

816 ) -> datetime.timedelta | None: 

817 """ 

818 Gets the config value for the given section and key, and converts it into datetime.timedelta object. 

819 

820 If the key is missing, then it is considered as `None`. 

821 

822 :param section: the section from the config 

823 :param key: the key defined in the given section 

824 :param fallback: fallback value when no config value is given, defaults to None 

825 :raises AirflowConfigException: raised because ValueError or OverflowError 

826 :return: datetime.timedelta(seconds=<config_value>) or None 

827 """ 

828 val = self.get(section, key, fallback=fallback, **kwargs) 

829 

830 if val: 

831 # the given value must be convertible to integer 

832 try: 

833 int_val = int(val) 

834 except ValueError: 

835 raise AirflowConfigException( 

836 f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' 

837 f'Current value: "{val}".' 

838 ) 

839 

840 try: 

841 return datetime.timedelta(seconds=int_val) 

842 except OverflowError as err: 

843 raise AirflowConfigException( 

844 f"Failed to convert value to timedelta in `seconds`. " 

845 f"{err}. " 

846 f'Please check "{key}" key in "{section}" section. Current value: "{val}".' 

847 ) 

848 

849 return fallback 

850 

851 def read( 

852 self, 

853 filenames: (str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike]), 

854 encoding=None, 

855 ): 

856 super().read(filenames=filenames, encoding=encoding) 

857 

858 # The RawConfigParser defines "Mapping" from abc.collections is not subscriptable - so we have 

859 # to use Dict here. 

860 def read_dict( # type: ignore[override] 

861 self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" 

862 ): 

863 super().read_dict(dictionary=dictionary, source=source) 

864 

865 def has_option(self, section: str, option: str) -> bool: 

866 try: 

867 # Using self.get() to avoid reimplementing the priority order 

868 # of config variables (env, config, cmd, defaults) 

869 # UNSET to avoid logging a warning about missing values 

870 self.get(section, option, fallback=_UNSET) 

871 return True 

872 except (NoOptionError, NoSectionError): 

873 return False 

874 

875 def remove_option(self, section: str, option: str, remove_default: bool = True): 

876 """ 

877 Remove an option if it exists in config from a file or default config. 

878 

879 If both of config have the same option, this removes the option 

880 in both configs unless remove_default=False. 

881 """ 

882 if super().has_option(section, option): 

883 super().remove_option(section, option) 

884 

885 if self.airflow_defaults.has_option(section, option) and remove_default: 

886 self.airflow_defaults.remove_option(section, option) 

887 

888 def getsection(self, section: str) -> ConfigOptionsDictType | None: 

889 """ 

890 Returns the section as a dict. 

891 

892 Values are converted to int, float, bool as required. 

893 

894 :param section: section from the config 

895 """ 

896 if not self.has_section(section) and not self.airflow_defaults.has_section(section): 

897 return None 

898 if self.airflow_defaults.has_section(section): 

899 _section: ConfigOptionsDictType = OrderedDict(self.airflow_defaults.items(section)) 

900 else: 

901 _section = OrderedDict() 

902 

903 if self.has_section(section): 

904 _section.update(OrderedDict(self.items(section))) 

905 

906 section_prefix = self._env_var_name(section, "") 

907 for env_var in sorted(os.environ.keys()): 

908 if env_var.startswith(section_prefix): 

909 key = env_var.replace(section_prefix, "") 

910 if key.endswith("_CMD"): 

911 key = key[:-4] 

912 key = key.lower() 

913 _section[key] = self._get_env_var_option(section, key) 

914 

915 for key, val in _section.items(): 

916 if val is None: 

917 raise AirflowConfigException( 

918 f"Failed to convert value automatically. " 

919 f'Please check "{key}" key in "{section}" section is set.' 

920 ) 

921 try: 

922 _section[key] = int(val) 

923 except ValueError: 

924 try: 

925 _section[key] = float(val) 

926 except ValueError: 

927 if isinstance(val, str) and val.lower() in ("t", "true"): 

928 _section[key] = True 

929 elif isinstance(val, str) and val.lower() in ("f", "false"): 

930 _section[key] = False 

931 return _section 

932 

933 def write( # type: ignore[override] 

934 self, fp: IO, space_around_delimiters: bool = True, section: str | None = None 

935 ) -> None: 

936 # This is based on the configparser.RawConfigParser.write method code to add support for 

937 # reading options from environment variables. 

938 # Various type ignores below deal with less-than-perfect RawConfigParser superclass typing 

939 if space_around_delimiters: 

940 delimiter = f" {self._delimiters[0]} " # type: ignore[attr-defined] 

941 else: 

942 delimiter = self._delimiters[0] # type: ignore[attr-defined] 

943 if self._defaults: # type: ignore 

944 self._write_section( # type: ignore[attr-defined] 

945 fp, self.default_section, self._defaults.items(), delimiter # type: ignore[attr-defined] 

946 ) 

947 sections = ( 

948 {section: dict(self.getsection(section))} # type: ignore[arg-type] 

949 if section 

950 else self._sections # type: ignore[attr-defined] 

951 ) 

952 for sect in sections: 

953 item_section: ConfigOptionsDictType = self.getsection(sect) # type: ignore[assignment] 

954 self._write_section(fp, sect, item_section.items(), delimiter) # type: ignore[attr-defined] 

955 

956 def as_dict( 

957 self, 

958 display_source: bool = False, 

959 display_sensitive: bool = False, 

960 raw: bool = False, 

961 include_env: bool = True, 

962 include_cmds: bool = True, 

963 include_secret: bool = True, 

964 ) -> ConfigSourcesType: 

965 """ 

966 Returns the current configuration as an OrderedDict of OrderedDicts. 

967 

968 When materializing current configuration Airflow defaults are 

969 materialized along with user set configs. If any of the `include_*` 

970 options are False then the result of calling command or secret key 

971 configs do not override Airflow defaults and instead are passed through. 

972 In order to then avoid Airflow defaults from overwriting user set 

973 command or secret key configs we filter out bare sensitive_config_values 

974 that are set to Airflow defaults when command or secret key configs 

975 produce different values. 

976 

977 :param display_source: If False, the option value is returned. If True, 

978 a tuple of (option_value, source) is returned. Source is either 

979 'airflow.cfg', 'default', 'env var', or 'cmd'. 

980 :param display_sensitive: If True, the values of options set by env 

981 vars and bash commands will be displayed. If False, those options 

982 are shown as '< hidden >' 

983 :param raw: Should the values be output as interpolated values, or the 

984 "raw" form that can be fed back in to ConfigParser 

985 :param include_env: Should the value of configuration from AIRFLOW__ 

986 environment variables be included or not 

987 :param include_cmds: Should the result of calling any *_cmd config be 

988 set (True, default), or should the _cmd options be left as the 

989 command to run (False) 

990 :param include_secret: Should the result of calling any *_secret config be 

991 set (True, default), or should the _secret options be left as the 

992 path to get the secret from (False) 

993 :return: Dictionary, where the key is the name of the section and the content is 

994 the dictionary with the name of the parameter and its value. 

995 """ 

996 if not display_sensitive: 

997 # We want to hide the sensitive values at the appropriate methods 

998 # since envs from cmds, secrets can be read at _include_envs method 

999 if not all([include_env, include_cmds, include_secret]): 

1000 raise ValueError( 

1001 "If display_sensitive is false, then include_env, " 

1002 "include_cmds, include_secret must all be set as True" 

1003 ) 

1004 

1005 config_sources: ConfigSourcesType = {} 

1006 configs = [ 

1007 ("default", self.airflow_defaults), 

1008 ("airflow.cfg", self), 

1009 ] 

1010 

1011 self._replace_config_with_display_sources( 

1012 config_sources, 

1013 configs, 

1014 display_source, 

1015 raw, 

1016 self.deprecated_options, 

1017 include_cmds=include_cmds, 

1018 include_env=include_env, 

1019 include_secret=include_secret, 

1020 ) 

1021 

1022 # add env vars and overwrite because they have priority 

1023 if include_env: 

1024 self._include_envs(config_sources, display_sensitive, display_source, raw) 

1025 else: 

1026 self._filter_by_source(config_sources, display_source, self._get_env_var_option) 

1027 

1028 # add bash commands 

1029 if include_cmds: 

1030 self._include_commands(config_sources, display_sensitive, display_source, raw) 

1031 else: 

1032 self._filter_by_source(config_sources, display_source, self._get_cmd_option) 

1033 

1034 # add config from secret backends 

1035 if include_secret: 

1036 self._include_secrets(config_sources, display_sensitive, display_source, raw) 

1037 else: 

1038 self._filter_by_source(config_sources, display_source, self._get_secret_option) 

1039 

1040 if not display_sensitive: 

1041 # This ensures the ones from config file is hidden too 

1042 # if they are not provided through env, cmd and secret 

1043 hidden = "< hidden >" 

1044 for (section, key) in self.sensitive_config_values: 

1045 if not config_sources.get(section): 

1046 continue 

1047 if config_sources[section].get(key, None): 

1048 if display_source: 

1049 source = config_sources[section][key][1] 

1050 config_sources[section][key] = (hidden, source) 

1051 else: 

1052 config_sources[section][key] = hidden 

1053 

1054 return config_sources 

1055 

1056 def _include_secrets( 

1057 self, 

1058 config_sources: ConfigSourcesType, 

1059 display_sensitive: bool, 

1060 display_source: bool, 

1061 raw: bool, 

1062 ): 

1063 for (section, key) in self.sensitive_config_values: 

1064 value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) 

1065 if value: 

1066 if not display_sensitive: 

1067 value = "< hidden >" 

1068 if display_source: 

1069 opt: str | tuple[str, str] = (value, "secret") 

1070 elif raw: 

1071 opt = value.replace("%", "%%") 

1072 else: 

1073 opt = value 

1074 config_sources.setdefault(section, OrderedDict()).update({key: opt}) 

1075 del config_sources[section][key + "_secret"] 

1076 

1077 def _include_commands( 

1078 self, 

1079 config_sources: ConfigSourcesType, 

1080 display_sensitive: bool, 

1081 display_source: bool, 

1082 raw: bool, 

1083 ): 

1084 for (section, key) in self.sensitive_config_values: 

1085 opt = self._get_cmd_option_from_config_sources(config_sources, section, key) 

1086 if not opt: 

1087 continue 

1088 opt_to_set: str | tuple[str, str] | None = opt 

1089 if not display_sensitive: 

1090 opt_to_set = "< hidden >" 

1091 if display_source: 

1092 opt_to_set = (str(opt_to_set), "cmd") 

1093 elif raw: 

1094 opt_to_set = str(opt_to_set).replace("%", "%%") 

1095 if opt_to_set is not None: 

1096 dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} 

1097 config_sources.setdefault(section, OrderedDict()).update(dict_to_update) 

1098 del config_sources[section][key + "_cmd"] 

1099 

1100 def _include_envs( 

1101 self, 

1102 config_sources: ConfigSourcesType, 

1103 display_sensitive: bool, 

1104 display_source: bool, 

1105 raw: bool, 

1106 ): 

1107 for env_var in [ 

1108 os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) 

1109 ]: 

1110 try: 

1111 _, section, key = env_var.split("__", 2) 

1112 opt = self._get_env_var_option(section, key) 

1113 except ValueError: 

1114 continue 

1115 if opt is None: 

1116 log.warning("Ignoring unknown env var '%s'", env_var) 

1117 continue 

1118 if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): 

1119 # Don't hide cmd/secret values here 

1120 if not env_var.lower().endswith("cmd") and not env_var.lower().endswith("secret"): 

1121 opt = "< hidden >" 

1122 

1123 elif raw: 

1124 opt = opt.replace("%", "%%") 

1125 if display_source: 

1126 opt = (opt, "env var") 

1127 

1128 section = section.lower() 

1129 # if we lower key for kubernetes_environment_variables section, 

1130 # then we won't be able to set any Airflow environment 

1131 # variables. Airflow only parse environment variables starts 

1132 # with AIRFLOW_. Therefore, we need to make it a special case. 

1133 if section != "kubernetes_environment_variables": 

1134 key = key.lower() 

1135 config_sources.setdefault(section, OrderedDict()).update({key: opt}) 

1136 

1137 def _filter_by_source( 

1138 self, 

1139 config_sources: ConfigSourcesType, 

1140 display_source: bool, 

1141 getter_func, 

1142 ): 

1143 """ 

1144 Deletes default configs from current configuration. 

1145 

1146 An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. 

1147 

1148 This is necessary because bare configs take precedence over the command 

1149 or secret key equivalents so if the current running config is 

1150 materialized with Airflow defaults they in turn override user set 

1151 command or secret key configs. 

1152 

1153 :param config_sources: The current configuration to operate on 

1154 :param display_source: If False, configuration options contain raw 

1155 values. If True, options are a tuple of (option_value, source). 

1156 Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. 

1157 :param getter_func: A callback function that gets the user configured 

1158 override value for a particular sensitive_config_values config. 

1159 :return: None, the given config_sources is filtered if necessary, 

1160 otherwise untouched. 

1161 """ 

1162 for (section, key) in self.sensitive_config_values: 

1163 # Don't bother if we don't have section / key 

1164 if section not in config_sources or key not in config_sources[section]: 

1165 continue 

1166 # Check that there is something to override defaults 

1167 try: 

1168 getter_opt = getter_func(section, key) 

1169 except ValueError: 

1170 continue 

1171 if not getter_opt: 

1172 continue 

1173 # Check to see that there is a default value 

1174 if not self.airflow_defaults.has_option(section, key): 

1175 continue 

1176 # Check to see if bare setting is the same as defaults 

1177 if display_source: 

1178 # when display_source = true, we know that the config_sources contains tuple 

1179 opt, source = config_sources[section][key] # type: ignore 

1180 else: 

1181 opt = config_sources[section][key] 

1182 if opt == self.airflow_defaults.get(section, key): 

1183 del config_sources[section][key] 

1184 

1185 @staticmethod 

1186 def _replace_config_with_display_sources( 

1187 config_sources: ConfigSourcesType, 

1188 configs: Iterable[tuple[str, ConfigParser]], 

1189 display_source: bool, 

1190 raw: bool, 

1191 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

1192 include_env: bool, 

1193 include_cmds: bool, 

1194 include_secret: bool, 

1195 ): 

1196 for (source_name, config) in configs: 

1197 for section in config.sections(): 

1198 AirflowConfigParser._replace_section_config_with_display_sources( 

1199 config, 

1200 config_sources, 

1201 display_source, 

1202 raw, 

1203 section, 

1204 source_name, 

1205 deprecated_options, 

1206 configs, 

1207 include_env=include_env, 

1208 include_cmds=include_cmds, 

1209 include_secret=include_secret, 

1210 ) 

1211 

1212 @staticmethod 

1213 def _deprecated_value_is_set_in_config( 

1214 deprecated_section: str, 

1215 deprecated_key: str, 

1216 configs: Iterable[tuple[str, ConfigParser]], 

1217 ) -> bool: 

1218 for config_type, config in configs: 

1219 if config_type == "default": 

1220 continue 

1221 try: 

1222 deprecated_section_array = config.items(section=deprecated_section, raw=True) 

1223 for (key_candidate, _) in deprecated_section_array: 

1224 if key_candidate == deprecated_key: 

1225 return True 

1226 except NoSectionError: 

1227 pass 

1228 return False 

1229 

1230 @staticmethod 

1231 def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

1232 return ( 

1233 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") 

1234 is not None 

1235 ) 

1236 

1237 @staticmethod 

1238 def _deprecated_command_is_set_in_config( 

1239 deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]] 

1240 ) -> bool: 

1241 return AirflowConfigParser._deprecated_value_is_set_in_config( 

1242 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs 

1243 ) 

1244 

1245 @staticmethod 

1246 def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

1247 return ( 

1248 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") 

1249 is not None 

1250 ) 

1251 

1252 @staticmethod 

1253 def _deprecated_secret_is_set_in_config( 

1254 deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]] 

1255 ) -> bool: 

1256 return AirflowConfigParser._deprecated_value_is_set_in_config( 

1257 deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs 

1258 ) 

1259 

1260 @staticmethod 

1261 def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: 

1262 return ( 

1263 os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") 

1264 is not None 

1265 ) 

1266 

1267 @contextmanager 

1268 def suppress_future_warnings(self): 

1269 suppress_future_warnings = self._suppress_future_warnings 

1270 self._suppress_future_warnings = True 

1271 yield self 

1272 self._suppress_future_warnings = suppress_future_warnings 

1273 

1274 @staticmethod 

1275 def _replace_section_config_with_display_sources( 

1276 config: ConfigParser, 

1277 config_sources: ConfigSourcesType, 

1278 display_source: bool, 

1279 raw: bool, 

1280 section: str, 

1281 source_name: str, 

1282 deprecated_options: dict[tuple[str, str], tuple[str, str, str]], 

1283 configs: Iterable[tuple[str, ConfigParser]], 

1284 include_env: bool, 

1285 include_cmds: bool, 

1286 include_secret: bool, 

1287 ): 

1288 sect = config_sources.setdefault(section, OrderedDict()) 

1289 if isinstance(config, AirflowConfigParser): 

1290 with config.suppress_future_warnings(): 

1291 items = config.items(section=section, raw=raw) 

1292 else: 

1293 items = config.items(section=section, raw=raw) 

1294 for k, val in items: 

1295 deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) 

1296 if deprecated_section and deprecated_key: 

1297 if source_name == "default": 

1298 # If deprecated entry has some non-default value set for any of the sources requested, 

1299 # We should NOT set default for the new entry (because it will override anything 

1300 # coming from the deprecated ones) 

1301 if AirflowConfigParser._deprecated_value_is_set_in_config( 

1302 deprecated_section, deprecated_key, configs 

1303 ): 

1304 continue 

1305 if include_env and AirflowConfigParser._deprecated_variable_is_set( 

1306 deprecated_section, deprecated_key 

1307 ): 

1308 continue 

1309 if include_cmds and ( 

1310 AirflowConfigParser._deprecated_variable_command_is_set( 

1311 deprecated_section, deprecated_key 

1312 ) 

1313 or AirflowConfigParser._deprecated_command_is_set_in_config( 

1314 deprecated_section, deprecated_key, configs 

1315 ) 

1316 ): 

1317 continue 

1318 if include_secret and ( 

1319 AirflowConfigParser._deprecated_variable_secret_is_set( 

1320 deprecated_section, deprecated_key 

1321 ) 

1322 or AirflowConfigParser._deprecated_secret_is_set_in_config( 

1323 deprecated_section, deprecated_key, configs 

1324 ) 

1325 ): 

1326 continue 

1327 if display_source: 

1328 sect[k] = (val, source_name) 

1329 else: 

1330 sect[k] = val 

1331 

1332 def load_test_config(self): 

1333 """ 

1334 Load the unit test configuration. 

1335 

1336 Note: this is not reversible. 

1337 """ 

1338 # remove all sections, falling back to defaults 

1339 for section in self.sections(): 

1340 self.remove_section(section) 

1341 

1342 # then read test config 

1343 

1344 path = _default_config_file_path("default_test.cfg") 

1345 log.info("Reading default test configuration from %s", path) 

1346 self.read_string(_parameterized_config_from_template("default_test.cfg")) 

1347 # then read any "custom" test settings 

1348 log.info("Reading test configuration from %s", TEST_CONFIG_FILE) 

1349 self.read(TEST_CONFIG_FILE) 

1350 

1351 @staticmethod 

1352 def _warn_deprecate(section: str, key: str, deprecated_section: str, deprecated_name: str): 

1353 if section == deprecated_section: 

1354 warnings.warn( 

1355 f"The {deprecated_name} option in [{section}] has been renamed to {key} - " 

1356 f"the old setting has been used, but please update your config.", 

1357 DeprecationWarning, 

1358 stacklevel=4, 

1359 ) 

1360 else: 

1361 warnings.warn( 

1362 f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " 

1363 f"in [{section}] - the old setting has been used, but please update your config.", 

1364 DeprecationWarning, 

1365 stacklevel=4, 

1366 ) 

1367 

1368 def __getstate__(self): 

1369 return { 

1370 name: getattr(self, name) 

1371 for name in [ 

1372 "_sections", 

1373 "is_validated", 

1374 "airflow_defaults", 

1375 ] 

1376 } 

1377 

1378 def __setstate__(self, state): 

1379 self.__init__() 

1380 config = state.pop("_sections") 

1381 self.read_dict(config) 

1382 self.__dict__.update(state) 

1383 

1384 

1385def get_airflow_home() -> str: 

1386 """Get path to Airflow Home.""" 

1387 return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) 

1388 

1389 

1390def get_airflow_config(airflow_home) -> str: 

1391 """Get Path to airflow.cfg path.""" 

1392 airflow_config_var = os.environ.get("AIRFLOW_CONFIG") 

1393 if airflow_config_var is None: 

1394 return os.path.join(airflow_home, "airflow.cfg") 

1395 return expand_env_var(airflow_config_var) 

1396 

1397 

1398def _parameterized_config_from_template(filename) -> str: 

1399 TEMPLATE_START = "# ----------------------- TEMPLATE BEGINS HERE -----------------------\n" 

1400 

1401 path = _default_config_file_path(filename) 

1402 with open(path) as fh: 

1403 for line in fh: 

1404 if line != TEMPLATE_START: 

1405 continue 

1406 return parameterized_config(fh.read().strip()) 

1407 raise RuntimeError(f"Template marker not found in {path!r}") 

1408 

1409 

1410def parameterized_config(template) -> str: 

1411 """ 

1412 Generates configuration from provided template & variables defined in current scope. 

1413 

1414 :param template: a config content templated with {{variables}} 

1415 """ 

1416 all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()} 

1417 return template.format(**all_vars) 

1418 

1419 

1420def get_airflow_test_config(airflow_home) -> str: 

1421 """Get path to unittests.cfg.""" 

1422 if "AIRFLOW_TEST_CONFIG" not in os.environ: 

1423 return os.path.join(airflow_home, "unittests.cfg") 

1424 # It will never return None 

1425 return expand_env_var(os.environ["AIRFLOW_TEST_CONFIG"]) # type: ignore[return-value] 

1426 

1427 

1428def _generate_fernet_key() -> str: 

1429 from cryptography.fernet import Fernet 

1430 

1431 return Fernet.generate_key().decode() 

1432 

1433 

1434def initialize_config() -> AirflowConfigParser: 

1435 """ 

1436 Load the Airflow config files. 

1437 

1438 Called for you automatically as part of the Airflow boot process. 

1439 """ 

1440 global FERNET_KEY, AIRFLOW_HOME 

1441 

1442 default_config = _parameterized_config_from_template("default_airflow.cfg") 

1443 

1444 local_conf = AirflowConfigParser(default_config=default_config) 

1445 

1446 if local_conf.getboolean("core", "unit_test_mode"): 

1447 # Load test config only 

1448 if not os.path.isfile(TEST_CONFIG_FILE): 

1449 from cryptography.fernet import Fernet 

1450 

1451 log.info("Creating new Airflow config file for unit tests in: %s", TEST_CONFIG_FILE) 

1452 pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True) 

1453 

1454 FERNET_KEY = Fernet.generate_key().decode() 

1455 

1456 with open(TEST_CONFIG_FILE, "w") as file: 

1457 cfg = _parameterized_config_from_template("default_test.cfg") 

1458 file.write(cfg) 

1459 

1460 local_conf.load_test_config() 

1461 else: 

1462 # Load normal config 

1463 if not os.path.isfile(AIRFLOW_CONFIG): 

1464 from cryptography.fernet import Fernet 

1465 

1466 log.info("Creating new Airflow config file in: %s", AIRFLOW_CONFIG) 

1467 pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True) 

1468 

1469 FERNET_KEY = Fernet.generate_key().decode() 

1470 

1471 with open(AIRFLOW_CONFIG, "w") as file: 

1472 file.write(default_config) 

1473 

1474 log.info("Reading the config from %s", AIRFLOW_CONFIG) 

1475 

1476 local_conf.read(AIRFLOW_CONFIG) 

1477 

1478 if local_conf.has_option("core", "AIRFLOW_HOME"): 

1479 msg = ( 

1480 "Specifying both AIRFLOW_HOME environment variable and airflow_home " 

1481 "in the config file is deprecated. Please use only the AIRFLOW_HOME " 

1482 "environment variable and remove the config file entry." 

1483 ) 

1484 if "AIRFLOW_HOME" in os.environ: 

1485 warnings.warn(msg, category=DeprecationWarning) 

1486 elif local_conf.get("core", "airflow_home") == AIRFLOW_HOME: 

1487 warnings.warn( 

1488 "Specifying airflow_home in the config file is deprecated. As you " 

1489 "have left it at the default value you should remove the setting " 

1490 "from your airflow.cfg and suffer no change in behaviour.", 

1491 category=DeprecationWarning, 

1492 ) 

1493 else: 

1494 # there 

1495 AIRFLOW_HOME = local_conf.get("core", "airflow_home") # type: ignore[assignment] 

1496 warnings.warn(msg, category=DeprecationWarning) 

1497 

1498 # They _might_ have set unit_test_mode in the airflow.cfg, we still 

1499 # want to respect that and then load the unittests.cfg 

1500 if local_conf.getboolean("core", "unit_test_mode"): 

1501 local_conf.load_test_config() 

1502 

1503 # Make it no longer a proxy variable, just set it to an actual string 

1504 global WEBSERVER_CONFIG 

1505 WEBSERVER_CONFIG = AIRFLOW_HOME + "/webserver_config.py" 

1506 

1507 if not os.path.isfile(WEBSERVER_CONFIG): 

1508 import shutil 

1509 

1510 log.info("Creating new FAB webserver config file in: %s", WEBSERVER_CONFIG) 

1511 shutil.copy(_default_config_file_path("default_webserver_config.py"), WEBSERVER_CONFIG) 

1512 return local_conf 

1513 

1514 

1515# Historical convenience functions to access config entries 

1516def load_test_config(): 

1517 """Historical load_test_config.""" 

1518 warnings.warn( 

1519 "Accessing configuration method 'load_test_config' directly from the configuration module is " 

1520 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1521 "'conf.load_test_config'", 

1522 DeprecationWarning, 

1523 stacklevel=2, 

1524 ) 

1525 conf.load_test_config() 

1526 

1527 

1528def get(*args, **kwargs) -> ConfigType | None: 

1529 """Historical get.""" 

1530 warnings.warn( 

1531 "Accessing configuration method 'get' directly from the configuration module is " 

1532 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1533 "'conf.get'", 

1534 DeprecationWarning, 

1535 stacklevel=2, 

1536 ) 

1537 return conf.get(*args, **kwargs) 

1538 

1539 

1540def getboolean(*args, **kwargs) -> bool: 

1541 """Historical getboolean.""" 

1542 warnings.warn( 

1543 "Accessing configuration method 'getboolean' directly from the configuration module is " 

1544 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1545 "'conf.getboolean'", 

1546 DeprecationWarning, 

1547 stacklevel=2, 

1548 ) 

1549 return conf.getboolean(*args, **kwargs) 

1550 

1551 

1552def getfloat(*args, **kwargs) -> float: 

1553 """Historical getfloat.""" 

1554 warnings.warn( 

1555 "Accessing configuration method 'getfloat' directly from the configuration module is " 

1556 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1557 "'conf.getfloat'", 

1558 DeprecationWarning, 

1559 stacklevel=2, 

1560 ) 

1561 return conf.getfloat(*args, **kwargs) 

1562 

1563 

1564def getint(*args, **kwargs) -> int: 

1565 """Historical getint.""" 

1566 warnings.warn( 

1567 "Accessing configuration method 'getint' directly from the configuration module is " 

1568 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1569 "'conf.getint'", 

1570 DeprecationWarning, 

1571 stacklevel=2, 

1572 ) 

1573 return conf.getint(*args, **kwargs) 

1574 

1575 

1576def getsection(*args, **kwargs) -> ConfigOptionsDictType | None: 

1577 """Historical getsection.""" 

1578 warnings.warn( 

1579 "Accessing configuration method 'getsection' directly from the configuration module is " 

1580 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1581 "'conf.getsection'", 

1582 DeprecationWarning, 

1583 stacklevel=2, 

1584 ) 

1585 return conf.getsection(*args, **kwargs) 

1586 

1587 

1588def has_option(*args, **kwargs) -> bool: 

1589 """Historical has_option.""" 

1590 warnings.warn( 

1591 "Accessing configuration method 'has_option' directly from the configuration module is " 

1592 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1593 "'conf.has_option'", 

1594 DeprecationWarning, 

1595 stacklevel=2, 

1596 ) 

1597 return conf.has_option(*args, **kwargs) 

1598 

1599 

1600def remove_option(*args, **kwargs) -> bool: 

1601 """Historical remove_option.""" 

1602 warnings.warn( 

1603 "Accessing configuration method 'remove_option' directly from the configuration module is " 

1604 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1605 "'conf.remove_option'", 

1606 DeprecationWarning, 

1607 stacklevel=2, 

1608 ) 

1609 return conf.remove_option(*args, **kwargs) 

1610 

1611 

1612def as_dict(*args, **kwargs) -> ConfigSourcesType: 

1613 """Historical as_dict.""" 

1614 warnings.warn( 

1615 "Accessing configuration method 'as_dict' directly from the configuration module is " 

1616 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1617 "'conf.as_dict'", 

1618 DeprecationWarning, 

1619 stacklevel=2, 

1620 ) 

1621 return conf.as_dict(*args, **kwargs) 

1622 

1623 

1624def set(*args, **kwargs) -> None: 

1625 """Historical set.""" 

1626 warnings.warn( 

1627 "Accessing configuration method 'set' directly from the configuration module is " 

1628 "deprecated. Please access the configuration from the 'configuration.conf' object via " 

1629 "'conf.set'", 

1630 DeprecationWarning, 

1631 stacklevel=2, 

1632 ) 

1633 conf.set(*args, **kwargs) 

1634 

1635 

1636def ensure_secrets_loaded() -> list[BaseSecretsBackend]: 

1637 """ 

1638 Ensure that all secrets backends are loaded. 

1639 If the secrets_backend_list contains only 2 default backends, reload it. 

1640 """ 

1641 # Check if the secrets_backend_list contains only 2 default backends 

1642 if len(secrets_backend_list) == 2: 

1643 return initialize_secrets_backends() 

1644 return secrets_backend_list 

1645 

1646 

1647def get_custom_secret_backend() -> BaseSecretsBackend | None: 

1648 """Get Secret Backend if defined in airflow.cfg.""" 

1649 secrets_backend_cls = conf.getimport(section="secrets", key="backend") 

1650 

1651 if not secrets_backend_cls: 

1652 return None 

1653 

1654 try: 

1655 backend_kwargs = conf.getjson(section="secrets", key="backend_kwargs") 

1656 if not backend_kwargs: 

1657 backend_kwargs = {} 

1658 elif not isinstance(backend_kwargs, dict): 

1659 raise ValueError("not a dict") 

1660 except AirflowConfigException: 

1661 log.warning("Failed to parse [secrets] backend_kwargs as JSON, defaulting to no kwargs.") 

1662 backend_kwargs = {} 

1663 except ValueError: 

1664 log.warning("Failed to parse [secrets] backend_kwargs into a dict, defaulting to no kwargs.") 

1665 backend_kwargs = {} 

1666 

1667 return secrets_backend_cls(**backend_kwargs) 

1668 

1669 

1670def initialize_secrets_backends() -> list[BaseSecretsBackend]: 

1671 """ 

1672 Initialize secrets backend. 

1673 

1674 * import secrets backend classes 

1675 * instantiate them and return them in a list 

1676 """ 

1677 backend_list = [] 

1678 

1679 custom_secret_backend = get_custom_secret_backend() 

1680 

1681 if custom_secret_backend is not None: 

1682 backend_list.append(custom_secret_backend) 

1683 

1684 for class_name in DEFAULT_SECRETS_SEARCH_PATH: 

1685 secrets_backend_cls = import_string(class_name) 

1686 backend_list.append(secrets_backend_cls()) 

1687 

1688 return backend_list 

1689 

1690 

1691@functools.lru_cache(maxsize=None) 

1692def _DEFAULT_CONFIG() -> str: 

1693 path = _default_config_file_path("default_airflow.cfg") 

1694 with open(path) as fh: 

1695 return fh.read() 

1696 

1697 

1698@functools.lru_cache(maxsize=None) 

1699def _TEST_CONFIG() -> str: 

1700 path = _default_config_file_path("default_test.cfg") 

1701 with open(path) as fh: 

1702 return fh.read() 

1703 

1704 

1705_deprecated = { 

1706 "DEFAULT_CONFIG": _DEFAULT_CONFIG, 

1707 "TEST_CONFIG": _TEST_CONFIG, 

1708 "TEST_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_test.cfg"), 

1709 "DEFAULT_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_airflow.cfg"), 

1710} 

1711 

1712 

1713def __getattr__(name): 

1714 if name in _deprecated: 

1715 warnings.warn( 

1716 f"{__name__}.{name} is deprecated and will be removed in future", 

1717 DeprecationWarning, 

1718 stacklevel=2, 

1719 ) 

1720 return _deprecated[name]() 

1721 raise AttributeError(f"module {__name__} has no attribute {name}") 

1722 

1723 

1724# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using 

1725# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. 

1726AIRFLOW_HOME = get_airflow_home() 

1727AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) 

1728 

1729 

1730# Set up dags folder for unit tests 

1731# this directory won't exist if users install via pip 

1732_TEST_DAGS_FOLDER = os.path.join( 

1733 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" 

1734) 

1735if os.path.exists(_TEST_DAGS_FOLDER): 

1736 TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER 

1737else: 

1738 TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") 

1739 

1740# Set up plugins folder for unit tests 

1741_TEST_PLUGINS_FOLDER = os.path.join( 

1742 os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" 

1743) 

1744if os.path.exists(_TEST_PLUGINS_FOLDER): 

1745 TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER 

1746else: 

1747 TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") 

1748 

1749 

1750TEST_CONFIG_FILE = get_airflow_test_config(AIRFLOW_HOME) 

1751 

1752SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") 

1753FERNET_KEY = "" # Set only if needed when generating a new file 

1754WEBSERVER_CONFIG = "" # Set by initialize_config 

1755 

1756conf = initialize_config() 

1757secrets_backend_list = initialize_secrets_backends() 

1758conf.validate()