Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/settings.py: 65%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

342 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import atexit 

21import json as json_lib 

22import logging 

23import os 

24import sys 

25import warnings 

26from collections.abc import Callable 

27from functools import cache, partial 

28from importlib import metadata 

29from typing import TYPE_CHECKING, Any, Literal 

30 

31import pluggy 

32from packaging.version import Version 

33from sqlalchemy import create_engine 

34from sqlalchemy.ext.asyncio import ( 

35 AsyncEngine, 

36 AsyncSession as SAAsyncSession, 

37 create_async_engine, 

38) 

39from sqlalchemy.orm import scoped_session, sessionmaker 

40 

41try: 

42 from sqlalchemy.ext.asyncio import async_sessionmaker 

43except ImportError: 

44 async_sessionmaker = sessionmaker # type: ignore[assignment,misc] 

45 

46from sqlalchemy.pool import NullPool 

47 

48from airflow import __version__ as airflow_version, policies 

49from airflow._shared.timezones.timezone import ( 

50 initialize as initialize_timezone, 

51 local_timezone, 

52 parse_timezone, 

53 utc, 

54) 

55from airflow.configuration import AIRFLOW_HOME, conf 

56from airflow.exceptions import AirflowInternalRuntimeError 

57from airflow.logging_config import configure_logging 

58from airflow.utils.orm_event_handlers import setup_event_handlers 

59 

60_USE_PSYCOPG3: bool 

61try: 

62 from importlib.util import find_spec 

63 

64 _USE_PSYCOPG3 = find_spec("psycopg") is not None 

65 

66except (ImportError, ModuleNotFoundError): 

67 _USE_PSYCOPG3 = False 

68 

69if TYPE_CHECKING: 

70 from sqlalchemy.engine import Engine 

71 

72 from airflow.api_fastapi.common.types import UIAlert 

73 

74log = logging.getLogger(__name__) 

75 

76try: 

77 tz_str = conf.get_mandatory_value("core", "default_timezone") 

78 initialize_timezone(tz_str) 

79 if tz_str != "system": 

80 TIMEZONE = parse_timezone(tz_str) 

81 else: 

82 TIMEZONE = local_timezone() 

83except Exception: 

84 TIMEZONE = utc 

85 initialize_timezone("UTC") 

86 

87log.info("Configured default timezone %s", TIMEZONE) 

88 

89if conf.has_option("database", "sql_alchemy_session_maker"): 

90 log.info( 

91 '[Warning] Found config "sql_alchemy_session_maker", make sure you know what you are doing.\n' 

92 "[Warning] Improper configuration of sql_alchemy_session_maker can lead to serious issues, " 

93 "including data corruption, unrecoverable application crashes.\n" 

94 "[Warning] Please review the SQLAlchemy documentation for detailed guidance on " 

95 "proper configuration and best practices." 

96 ) 

97 

98HEADER = "\n".join( 

99 [ 

100 r" ____________ _____________", 

101 r" ____ |__( )_________ __/__ /________ __", 

102 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /", 

103 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /", 

104 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/", 

105 ] 

106) 

107 

108SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format") 

109 

110SQL_ALCHEMY_CONN: str | None = None 

111SQL_ALCHEMY_CONN_ASYNC: str | None = None 

112PLUGINS_FOLDER: str | None = None 

113DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER")) 

114 

115engine: Engine | None = None 

116Session: scoped_session | None = None 

117# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without 

118# additional precautions. The only use case is when the session lifecycle needs 

119# custom handling. Most of the time we only want one unique thread local session object, 

120# this is achieved by the Session factory above. 

121NonScopedSession: sessionmaker | None = None 

122async_engine: AsyncEngine | None = None 

123AsyncSession: Callable[..., SAAsyncSession] | None = None 

124 

125 

126def get_engine(): 

127 """Get the configured engine, raising an error if not configured.""" 

128 if engine is None: 

129 raise RuntimeError("Engine not configured. Call configure_orm() first.") 

130 return engine 

131 

132 

133def get_session(): 

134 """Get the configured Session, raising an error if not configured.""" 

135 if Session is None: 

136 raise RuntimeError("Session not configured. Call configure_orm() first.") 

137 return Session 

138 

139 

140# The JSON library to use for DAG Serialization and De-Serialization 

141json = json_lib 

142 

143# Display alerts on the dashboard 

144# Useful for warning about setup issues or announcing changes to end users 

145# List of UIAlerts, which allows for specifying the content and category 

146# message to be shown. For example: 

147# from airflow.api_fastapi.common.types import UIAlert 

148# 

149# DASHBOARD_UIALERTS = [ 

150# UIAlert(text="Welcome to Airflow", category="info"), 

151# UIAlert(text="Upgrade tomorrow [help](https://www.example.com)", category="warning"), #With markdown support 

152# ] 

153# 

154DASHBOARD_UIALERTS: list[UIAlert] = [] 

155 

156 

157@cache 

158def _get_rich_console(file): 

159 # Delay imports until we need it 

160 import rich.console 

161 

162 return rich.console.Console(file=file) 

163 

164 

165def custom_show_warning(message, category, filename, lineno, file=None, line=None): 

166 """Print rich and visible warnings.""" 

167 # Delay imports until we need it 

168 import re 

169 

170 from rich.markup import escape 

171 

172 re_escape_regex = re.compile(r"(\\*)(\[[a-z#/@][^[]*?])").sub 

173 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}" 

174 msg += f" {category.__name__}[/bold]: {escape(str(message), _escape=re_escape_regex)}[/yellow]" 

175 write_console = _get_rich_console(file or sys.stderr) 

176 write_console.print(msg, soft_wrap=True) 

177 

178 

179def replace_showwarning(replacement): 

180 """ 

181 Replace ``warnings.showwarning``, returning the original. 

182 

183 This is useful since we want to "reset" the ``showwarning`` hook on exit to 

184 avoid lazy-loading issues. If a warning is emitted after Python cleaned up 

185 the import system, we would no longer be able to import ``rich``. 

186 """ 

187 original = warnings.showwarning 

188 warnings.showwarning = replacement 

189 return original 

190 

191 

192original_show_warning = replace_showwarning(custom_show_warning) 

193atexit.register(partial(replace_showwarning, original_show_warning)) 

194 

195 

196def task_policy(task): 

197 return get_policy_plugin_manager().hook.task_policy(task=task) 

198 

199 

200def dag_policy(dag): 

201 return get_policy_plugin_manager().hook.dag_policy(dag=dag) 

202 

203 

204def task_instance_mutation_hook(task_instance): 

205 return get_policy_plugin_manager().hook.task_instance_mutation_hook(task_instance=task_instance) 

206 

207 

208task_instance_mutation_hook.is_noop = True # type: ignore 

209 

210 

211def pod_mutation_hook(pod): 

212 return get_policy_plugin_manager().hook.pod_mutation_hook(pod=pod) 

213 

214 

215def get_airflow_context_vars(context): 

216 return get_policy_plugin_manager().hook.get_airflow_context_vars(context=context) 

217 

218 

219def get_dagbag_import_timeout(dag_file_path: str): 

220 return get_policy_plugin_manager().hook.get_dagbag_import_timeout(dag_file_path=dag_file_path) 

221 

222 

223@cache 

224def get_policy_plugin_manager() -> pluggy.PluginManager: 

225 plugin_mgr = pluggy.PluginManager(policies.local_settings_hookspec.project_name) 

226 plugin_mgr.add_hookspecs(policies) 

227 plugin_mgr.register(policies.DefaultPolicy) 

228 return plugin_mgr 

229 

230 

231def load_policy_plugins(pm: pluggy.PluginManager): 

232 # We can't log duration etc here, as logging hasn't yet been configured! 

233 pm.load_setuptools_entrypoints("airflow.policy") 

234 

235 

236def _get_async_conn_uri_from_sync(sync_uri): 

237 AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"} 

238 """Mapping of sync scheme to async scheme.""" 

239 

240 scheme, rest = sync_uri.split(":", maxsplit=1) 

241 scheme = scheme.split("+", maxsplit=1)[0] 

242 aiolib = AIO_LIBS_MAPPING.get(scheme) 

243 if aiolib: 

244 return f"{scheme}+{aiolib}:{rest}" 

245 return sync_uri 

246 

247 

248def configure_vars(): 

249 """Configure Global Variables from airflow.cfg.""" 

250 global SQL_ALCHEMY_CONN 

251 global SQL_ALCHEMY_CONN_ASYNC 

252 global DAGS_FOLDER 

253 global PLUGINS_FOLDER 

254 

255 SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn") 

256 if conf.has_option("database", "sql_alchemy_conn_async"): 

257 SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async") 

258 else: 

259 SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN) 

260 

261 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) 

262 

263 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins")) 

264 

265 

266def _run_openlineage_runtime_check(): 

267 """ 

268 Ensure compatibility of OpenLineage provider package and Airflow version. 

269 

270 Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage 

271 provider incompatible with future Airflow versions (>= 2.10.0). 

272 """ 

273 ol_package = "apache-airflow-providers-openlineage" 

274 try: 

275 ol_version = metadata.version(ol_package) 

276 except metadata.PackageNotFoundError: 

277 return 

278 

279 if ol_version and Version(ol_version) < Version("1.8.0.dev0"): 

280 raise RuntimeError( 

281 f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with " 

282 f"`apache-airflow` == `{airflow_version}`. " 

283 f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`." 

284 ) 

285 

286 

287def run_providers_custom_runtime_checks(): 

288 _run_openlineage_runtime_check() 

289 

290 

291class SkipDBTestsSession: 

292 """ 

293 This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set. 

294 

295 :meta private: 

296 """ 

297 

298 def __init__(self): 

299 raise AirflowInternalRuntimeError( 

300 "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n" 

301 "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n" 

302 "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#" 

303 "best-practices-for-db-tests on how " 

304 "to deal with it and consult examples." 

305 ) 

306 

307 def remove(*args, **kwargs): 

308 pass 

309 

310 def get_bind( 

311 self, 

312 mapper=None, 

313 clause=None, 

314 bind=None, 

315 _sa_skip_events=None, 

316 _sa_skip_for_implicit_returning=False, 

317 ): 

318 pass 

319 

320 

321def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool: 

322 """Determine whether the database connection URI specifies a relative path.""" 

323 # Check for non-empty connection string: 

324 if not sqla_conn_str: 

325 return False 

326 # Check for the right URI scheme: 

327 if not sqla_conn_str.startswith("sqlite"): 

328 return False 

329 # In-memory is not useful for production, but useful for writing tests against Airflow for extensions 

330 if sqla_conn_str == "sqlite://": 

331 return False 

332 # Check for absolute path: 

333 if sqla_conn_str.startswith(abs_prefix := "sqlite:///") and os.path.isabs( 

334 sqla_conn_str[len(abs_prefix) :] 

335 ): 

336 return False 

337 return True 

338 

339 

340def _get_connect_args(mode: Literal["sync", "async"]) -> Any: 

341 key = { 

342 "sync": "sql_alchemy_connect_args", 

343 "async": "sql_alchemy_connect_args_async", 

344 }[mode] 

345 if conf.has_option("database", key): 

346 return conf.getimport("database", key) 

347 return {} 

348 

349 

350def _configure_async_session() -> None: 

351 """ 

352 Configure async SQLAlchemy session. 

353 

354 This exists so tests can reconfigure the session. How SQLAlchemy configures 

355 this does not work well with Pytest and you can end up with issues when the 

356 session and runs in a different event loop from the test itself. 

357 """ 

358 global AsyncSession, async_engine 

359 

360 if not SQL_ALCHEMY_CONN_ASYNC: 

361 async_engine = None 

362 AsyncSession = None 

363 return 

364 

365 async_engine = create_async_engine( 

366 SQL_ALCHEMY_CONN_ASYNC, 

367 connect_args=_get_connect_args("async"), 

368 future=True, 

369 ) 

370 AsyncSession = async_sessionmaker( 

371 bind=async_engine, 

372 class_=SAAsyncSession, 

373 autoflush=False, 

374 expire_on_commit=False, 

375 ) 

376 

377 

378def configure_orm(disable_connection_pool=False, pool_class=None): 

379 """Configure ORM using SQLAlchemy.""" 

380 from airflow._shared.secrets_masker import mask_secret 

381 

382 if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN): 

383 from airflow.exceptions import AirflowConfigException 

384 

385 raise AirflowConfigException( 

386 f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. " 

387 "Please use absolute path such as `sqlite:////tmp/airflow.db`." 

388 ) 

389 

390 global NonScopedSession 

391 global Session 

392 global engine 

393 

394 if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": 

395 # Skip DB initialization in unit tests, if DB tests are skipped 

396 Session = SkipDBTestsSession 

397 engine = None 

398 return 

399 log.debug("Setting up DB connection pool (PID %s)", os.getpid()) 

400 engine_args = prepare_engine_args(disable_connection_pool, pool_class) 

401 

402 connect_args = _get_connect_args("sync") 

403 if SQL_ALCHEMY_CONN.startswith("sqlite"): 

404 # FastAPI runs sync endpoints in a separate thread. SQLite does not allow 

405 # to use objects created in another threads by default. Allowing that in test 

406 # to so the `test` thread and the tested endpoints can use common objects. 

407 connect_args["check_same_thread"] = False 

408 

409 engine = create_engine( 

410 SQL_ALCHEMY_CONN, 

411 connect_args=connect_args, 

412 **engine_args, 

413 future=True, 

414 ) 

415 _configure_async_session() 

416 mask_secret(engine.url.password) 

417 setup_event_handlers(engine) 

418 

419 if conf.has_option("database", "sql_alchemy_session_maker"): 

420 _session_maker = conf.getimport("database", "sql_alchemy_session_maker") 

421 else: 

422 _session_maker = partial( 

423 sessionmaker, 

424 autocommit=False, 

425 autoflush=False, 

426 expire_on_commit=False, 

427 ) 

428 if engine is None: 

429 raise RuntimeError("Engine must be initialized before creating a session") 

430 NonScopedSession = _session_maker(engine) 

431 Session = scoped_session(NonScopedSession) 

432 

433 if register_at_fork := getattr(os, "register_at_fork", None): 

434 # https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork 

435 def clean_in_fork(): 

436 _globals = globals() 

437 if engine := _globals.get("engine"): 

438 engine.dispose(close=False) 

439 if async_engine := _globals.get("async_engine"): 

440 async_engine.sync_engine.dispose(close=False) 

441 

442 # Won't work on Windows 

443 register_at_fork(after_in_child=clean_in_fork) 

444 

445 

446def prepare_engine_args(disable_connection_pool=False, pool_class=None): 

447 """Prepare SQLAlchemy engine args.""" 

448 DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = { 

449 "postgresql": ( 

450 { 

451 "insertmanyvalues_page_size": 10000, 

452 } 

453 | ( 

454 {} 

455 if _USE_PSYCOPG3 

456 else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000} 

457 ) 

458 ) 

459 } 

460 

461 default_args = {} 

462 for dialect, default in DEFAULT_ENGINE_ARGS.items(): 

463 if SQL_ALCHEMY_CONN.startswith(dialect): 

464 default_args = default.copy() 

465 break 

466 

467 engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args) 

468 

469 if pool_class: 

470 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args 

471 engine_args["poolclass"] = pool_class 

472 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"): 

473 engine_args["poolclass"] = NullPool 

474 log.debug("settings.prepare_engine_args(): Using NullPool") 

475 elif not SQL_ALCHEMY_CONN.startswith("sqlite"): 

476 # Pool size engine args not supported by sqlite. 

477 # If no config value is defined for the pool size, select a reasonable value. 

478 # 0 means no limit, which could lead to exceeding the Database connection limit. 

479 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5) 

480 

481 # The maximum overflow size of the pool. 

482 # When the number of checked-out connections reaches the size set in pool_size, 

483 # additional connections will be returned up to this limit. 

484 # When those additional connections are returned to the pool, they are disconnected and discarded. 

485 # It follows then that the total number of simultaneous connections 

486 # the pool will allow is pool_size + max_overflow, 

487 # and the total number of "sleeping" connections the pool will allow is pool_size. 

488 # max_overflow can be set to -1 to indicate no overflow limit; 

489 # no limit will be placed on the total number 

490 # of concurrent connections. Defaults to 10. 

491 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10) 

492 

493 # The DB server already has a value for wait_timeout (number of seconds after 

494 # which an idle sleeping connection should be killed). Since other DBs may 

495 # co-exist on the same server, SQLAlchemy should set its 

496 # pool_recycle to an equal or smaller value. 

497 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800) 

498 

499 # Check connection at the start of each connection pool checkout. 

500 # Typically, this is a simple statement like "SELECT 1", but may also make use 

501 # of some DBAPI-specific method to test the connection for liveness. 

502 # More information here: 

503 # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic 

504 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True) 

505 

506 log.debug( 

507 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, " 

508 "pool_recycle=%d, pid=%d", 

509 pool_size, 

510 max_overflow, 

511 pool_recycle, 

512 os.getpid(), 

513 ) 

514 engine_args["pool_size"] = pool_size 

515 engine_args["pool_recycle"] = pool_recycle 

516 engine_args["pool_pre_ping"] = pool_pre_ping 

517 engine_args["max_overflow"] = max_overflow 

518 

519 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when 

520 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots. 

521 # 'READ COMMITTED' is the default value for PostgreSQL. 

522 # More information here: 

523 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html 

524 if SQL_ALCHEMY_CONN.startswith("mysql"): 

525 engine_args["isolation_level"] = "READ COMMITTED" 

526 

527 return engine_args 

528 

529 

530def dispose_orm(do_log: bool = True): 

531 """Properly close pooled database connections.""" 

532 global Session, engine, NonScopedSession 

533 

534 _globals = globals() 

535 if _globals.get("engine") is None and _globals.get("Session") is None: 

536 return 

537 

538 if do_log: 

539 log.debug("Disposing DB connection pool (PID %s)", os.getpid()) 

540 

541 if "Session" in _globals and Session is not None: 

542 from sqlalchemy.orm.session import close_all_sessions 

543 

544 Session.remove() 

545 Session = None 

546 NonScopedSession = None 

547 close_all_sessions() 

548 

549 if "engine" in _globals and engine is not None: 

550 engine.dispose() 

551 engine = None 

552 

553 

554def reconfigure_orm(disable_connection_pool=False, pool_class=None): 

555 """Properly close database connections and re-configure ORM.""" 

556 dispose_orm() 

557 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class) 

558 

559 

560def configure_adapters(): 

561 """Register Adapters and DB Converters.""" 

562 from pendulum import DateTime as Pendulum 

563 

564 if SQL_ALCHEMY_CONN.startswith("sqlite"): 

565 from sqlite3 import register_adapter 

566 

567 register_adapter(Pendulum, lambda val: val.isoformat(" ")) 

568 

569 if SQL_ALCHEMY_CONN.startswith("mysql"): 

570 try: 

571 try: 

572 import MySQLdb.converters 

573 except ImportError: 

574 raise RuntimeError( 

575 "You do not have `mysqlclient` package installed. " 

576 "Please install it with `pip install mysqlclient` and make sure you have system " 

577 "mysql libraries installed, as well as well as `pkg-config` system package " 

578 "installed in case you see compilation error during installation." 

579 ) 

580 

581 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal 

582 except ImportError: 

583 pass 

584 try: 

585 import pymysql.converters 

586 

587 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime 

588 except ImportError: 

589 pass 

590 

591 

592def _configure_secrets_masker(): 

593 """Configure the secrets masker with values from config.""" 

594 from airflow._shared.secrets_masker import ( 

595 DEFAULT_SENSITIVE_FIELDS, 

596 _secrets_masker as secrets_masker_core, 

597 ) 

598 from airflow.configuration import conf 

599 

600 min_length_to_mask = conf.getint("logging", "min_length_masked_secret", fallback=5) 

601 secret_mask_adapter = conf.getimport("logging", "secret_mask_adapter", fallback=None) 

602 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy() 

603 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names") 

604 if sensitive_variable_fields: 

605 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")}) 

606 

607 hide_sensitive_var_conn_fields = conf.getboolean("core", "hide_sensitive_var_conn_fields") 

608 

609 core_masker = secrets_masker_core() 

610 core_masker.min_length_to_mask = min_length_to_mask 

611 core_masker.sensitive_variables_fields = list(sensitive_fields) 

612 core_masker.secret_mask_adapter = secret_mask_adapter 

613 core_masker.hide_sensitive_var_conn_fields = hide_sensitive_var_conn_fields 

614 

615 from airflow.sdk._shared.secrets_masker import _secrets_masker as sdk_secrets_masker 

616 

617 sdk_masker = sdk_secrets_masker() 

618 sdk_masker.min_length_to_mask = min_length_to_mask 

619 sdk_masker.sensitive_variables_fields = list(sensitive_fields) 

620 sdk_masker.secret_mask_adapter = secret_mask_adapter 

621 sdk_masker.hide_sensitive_var_conn_fields = hide_sensitive_var_conn_fields 

622 

623 

624def configure_action_logging() -> None: 

625 """Any additional configuration (register callback) for airflow.utils.action_loggers module.""" 

626 

627 

628def prepare_syspath_for_config_and_plugins(): 

629 """Update sys.path for the config and plugins directories.""" 

630 # Add ./config/ for loading custom log parsers etc, or 

631 # airflow_local_settings etc. 

632 config_path = os.path.join(AIRFLOW_HOME, "config") 

633 if config_path not in sys.path: 

634 sys.path.append(config_path) 

635 

636 if PLUGINS_FOLDER not in sys.path: 

637 sys.path.append(PLUGINS_FOLDER) 

638 

639 

640def __getattr__(name: str): 

641 """Handle deprecated module attributes.""" 

642 import warnings 

643 

644 from airflow.exceptions import RemovedInAirflow4Warning 

645 

646 if name == "MASK_SECRETS_IN_LOGS": 

647 warnings.warn( 

648 "settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns default value of False. " 

649 "Use SecretsMasker.enable_log_masking(), disable_log_masking(), or is_log_masking_enabled() instead.", 

650 RemovedInAirflow4Warning, 

651 stacklevel=2, 

652 ) 

653 return False 

654 if name == "WEB_COLORS": 

655 warnings.warn( 

656 "settings.WEB_COLORS has been removed. This shim returns default value. " 

657 "Please upgrade your provider or integration.", 

658 RemovedInAirflow4Warning, 

659 stacklevel=2, 

660 ) 

661 return {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"} 

662 if name == "EXECUTE_TASKS_NEW_PYTHON_INTERPRETER": 

663 warnings.warn( 

664 "settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER has been removed. This shim returns default value. " 

665 "Please upgrade your provider or integration.", 

666 RemovedInAirflow4Warning, 

667 stacklevel=2, 

668 ) 

669 return not hasattr(os, "fork") or conf.getboolean( 

670 "core", 

671 "execute_tasks_new_python_interpreter", 

672 fallback=False, 

673 ) 

674 

675 raise AttributeError(f"module '{__name__}' has no attribute '{name}'") 

676 

677 

678def import_local_settings(): 

679 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file.""" 

680 try: 

681 import airflow_local_settings 

682 except ModuleNotFoundError as e: 

683 if e.name == "airflow_local_settings": 

684 log.debug("No airflow_local_settings to import.", exc_info=True) 

685 else: 

686 log.critical( 

687 "Failed to import airflow_local_settings due to a transitive module not found error.", 

688 exc_info=True, 

689 ) 

690 raise 

691 except ImportError: 

692 log.critical("Failed to import airflow_local_settings.", exc_info=True) 

693 raise 

694 else: 

695 if hasattr(airflow_local_settings, "__all__"): 

696 names = set(airflow_local_settings.__all__) 

697 else: 

698 names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")} 

699 

700 plugin_functions = policies.make_plugin_from_local_settings( 

701 get_policy_plugin_manager(), airflow_local_settings, names 

702 ) 

703 

704 # If we have already handled a function by adding it to the plugin, 

705 # then don't clobber the global function 

706 for name in names - plugin_functions: 

707 globals()[name] = getattr(airflow_local_settings, name) 

708 

709 if get_policy_plugin_manager().hook.task_instance_mutation_hook.get_hookimpls(): 

710 task_instance_mutation_hook.is_noop = False 

711 

712 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__) 

713 

714 

715def initialize(): 

716 """Initialize Airflow with all the settings from this file.""" 

717 configure_vars() 

718 prepare_syspath_for_config_and_plugins() 

719 policy_mgr = get_policy_plugin_manager() 

720 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything 

721 # in airflow_local_settings to take precendec 

722 load_policy_plugins(policy_mgr) 

723 import_local_settings() 

724 configure_logging() 

725 

726 configure_adapters() 

727 # The webservers import this file from models.py with the default settings. 

728 

729 # Configure secrets masker before masking secrets 

730 _configure_secrets_masker() 

731 

732 is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" 

733 if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None) and not is_worker: 

734 configure_orm() 

735 

736 # mask the sensitive_config_values 

737 conf.mask_secrets() 

738 configure_action_logging() 

739 

740 # Run any custom runtime checks that needs to be executed for providers 

741 run_providers_custom_runtime_checks() 

742 

743 # Ensure we close DB connections at scheduler and gunicorn worker terminations 

744 atexit.register(dispose_orm) 

745 

746 

747# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False, 

748# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module. 

749LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True) 

750 

751# By default Airflow providers are lazily-discovered (discovery and imports happen only when required). 

752# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or 

753# loaded from module. 

754LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True) 

755 

756DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077") 

757 

758# Prefix used by gunicorn workers to indicate they are ready to serve requests 

759# Used by GunicornMonitor to track worker readiness via process titles 

760GUNICORN_WORKER_READY_PREFIX: str = "[ready] "