Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/settings.py: 65%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

342 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import atexit 

21import json as json_lib 

22import logging 

23import os 

24import sys 

25import warnings 

26from collections.abc import Callable 

27from functools import cache, partial 

28from importlib import metadata 

29from typing import TYPE_CHECKING, Any, Literal 

30 

31import pluggy 

32from packaging.version import Version 

33from sqlalchemy import create_engine 

34from sqlalchemy.ext.asyncio import ( 

35 AsyncEngine, 

36 AsyncSession as SAAsyncSession, 

37 create_async_engine, 

38) 

39from sqlalchemy.orm import scoped_session, sessionmaker 

40 

41try: 

42 from sqlalchemy.ext.asyncio import async_sessionmaker 

43except ImportError: 

44 async_sessionmaker = sessionmaker # type: ignore[assignment,misc] 

45 

46from sqlalchemy.pool import NullPool 

47 

48from airflow import __version__ as airflow_version, policies 

49from airflow._shared.timezones.timezone import ( 

50 initialize as initialize_timezone, 

51 local_timezone, 

52 parse_timezone, 

53 utc, 

54) 

55from airflow.configuration import AIRFLOW_HOME, conf 

56from airflow.exceptions import AirflowInternalRuntimeError 

57from airflow.logging_config import configure_logging 

58from airflow.utils.orm_event_handlers import setup_event_handlers 

59from airflow.utils.sqlalchemy import is_sqlalchemy_v1 

60 

61_USE_PSYCOPG3: bool 

62try: 

63 from importlib.util import find_spec 

64 

65 is_psycopg3 = find_spec("psycopg") is not None 

66 

67 _USE_PSYCOPG3 = is_psycopg3 and not is_sqlalchemy_v1() 

68except (ImportError, ModuleNotFoundError): 

69 _USE_PSYCOPG3 = False 

70 

71if TYPE_CHECKING: 

72 from sqlalchemy.engine import Engine 

73 

74 from airflow.api_fastapi.common.types import UIAlert 

75 

76log = logging.getLogger(__name__) 

77 

78try: 

79 tz_str = conf.get_mandatory_value("core", "default_timezone") 

80 initialize_timezone(tz_str) 

81 if tz_str != "system": 

82 TIMEZONE = parse_timezone(tz_str) 

83 else: 

84 TIMEZONE = local_timezone() 

85except Exception: 

86 TIMEZONE = utc 

87 initialize_timezone("UTC") 

88 

89log.info("Configured default timezone %s", TIMEZONE) 

90 

91if conf.has_option("database", "sql_alchemy_session_maker"): 

92 log.info( 

93 '[Warning] Found config "sql_alchemy_session_maker", make sure you know what you are doing.\n' 

94 "[Warning] Improper configuration of sql_alchemy_session_maker can lead to serious issues, " 

95 "including data corruption, unrecoverable application crashes.\n" 

96 "[Warning] Please review the SQLAlchemy documentation for detailed guidance on " 

97 "proper configuration and best practices." 

98 ) 

99 

100HEADER = "\n".join( 

101 [ 

102 r" ____________ _____________", 

103 r" ____ |__( )_________ __/__ /________ __", 

104 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /", 

105 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /", 

106 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/", 

107 ] 

108) 

109 

110SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format") 

111 

112SQL_ALCHEMY_CONN: str | None = None 

113SQL_ALCHEMY_CONN_ASYNC: str | None = None 

114PLUGINS_FOLDER: str | None = None 

115DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER")) 

116 

117engine: Engine | None = None 

118Session: scoped_session | None = None 

119# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without 

120# additional precautions. The only use case is when the session lifecycle needs 

121# custom handling. Most of the time we only want one unique thread local session object, 

122# this is achieved by the Session factory above. 

123NonScopedSession: sessionmaker | None = None 

124async_engine: AsyncEngine | None = None 

125AsyncSession: Callable[..., SAAsyncSession] | None = None 

126 

127 

128def get_engine(): 

129 """Get the configured engine, raising an error if not configured.""" 

130 if engine is None: 

131 raise RuntimeError("Engine not configured. Call configure_orm() first.") 

132 return engine 

133 

134 

135def get_session(): 

136 """Get the configured Session, raising an error if not configured.""" 

137 if Session is None: 

138 raise RuntimeError("Session not configured. Call configure_orm() first.") 

139 return Session 

140 

141 

142# The JSON library to use for DAG Serialization and De-Serialization 

143json = json_lib 

144 

145# Display alerts on the dashboard 

146# Useful for warning about setup issues or announcing changes to end users 

147# List of UIAlerts, which allows for specifying the content and category 

148# message to be shown. For example: 

149# from airflow.api_fastapi.common.types import UIAlert 

150# 

151# DASHBOARD_UIALERTS = [ 

152# UIAlert(text="Welcome to Airflow", category="info"), 

153# UIAlert(text="Upgrade tomorrow [help](https://www.example.com)", category="warning"), #With markdown support 

154# ] 

155# 

156DASHBOARD_UIALERTS: list[UIAlert] = [] 

157 

158 

159@cache 

160def _get_rich_console(file): 

161 # Delay imports until we need it 

162 import rich.console 

163 

164 return rich.console.Console(file=file) 

165 

166 

167def custom_show_warning(message, category, filename, lineno, file=None, line=None): 

168 """Print rich and visible warnings.""" 

169 # Delay imports until we need it 

170 import re 

171 

172 from rich.markup import escape 

173 

174 re_escape_regex = re.compile(r"(\\*)(\[[a-z#/@][^[]*?])").sub 

175 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}" 

176 msg += f" {category.__name__}[/bold]: {escape(str(message), _escape=re_escape_regex)}[/yellow]" 

177 write_console = _get_rich_console(file or sys.stderr) 

178 write_console.print(msg, soft_wrap=True) 

179 

180 

181def replace_showwarning(replacement): 

182 """ 

183 Replace ``warnings.showwarning``, returning the original. 

184 

185 This is useful since we want to "reset" the ``showwarning`` hook on exit to 

186 avoid lazy-loading issues. If a warning is emitted after Python cleaned up 

187 the import system, we would no longer be able to import ``rich``. 

188 """ 

189 original = warnings.showwarning 

190 warnings.showwarning = replacement 

191 return original 

192 

193 

194original_show_warning = replace_showwarning(custom_show_warning) 

195atexit.register(partial(replace_showwarning, original_show_warning)) 

196 

197 

198def task_policy(task): 

199 return get_policy_plugin_manager().hook.task_policy(task=task) 

200 

201 

202def dag_policy(dag): 

203 return get_policy_plugin_manager().hook.dag_policy(dag=dag) 

204 

205 

206def task_instance_mutation_hook(task_instance): 

207 return get_policy_plugin_manager().hook.task_instance_mutation_hook(task_instance=task_instance) 

208 

209 

210task_instance_mutation_hook.is_noop = True # type: ignore 

211 

212 

213def pod_mutation_hook(pod): 

214 return get_policy_plugin_manager().hook.pod_mutation_hook(pod=pod) 

215 

216 

217def get_airflow_context_vars(context): 

218 return get_policy_plugin_manager().hook.get_airflow_context_vars(context=context) 

219 

220 

221def get_dagbag_import_timeout(dag_file_path: str): 

222 return get_policy_plugin_manager().hook.get_dagbag_import_timeout(dag_file_path=dag_file_path) 

223 

224 

225@cache 

226def get_policy_plugin_manager() -> pluggy.PluginManager: 

227 plugin_mgr = pluggy.PluginManager(policies.local_settings_hookspec.project_name) 

228 plugin_mgr.add_hookspecs(policies) 

229 plugin_mgr.register(policies.DefaultPolicy) 

230 return plugin_mgr 

231 

232 

233def load_policy_plugins(pm: pluggy.PluginManager): 

234 # We can't log duration etc here, as logging hasn't yet been configured! 

235 pm.load_setuptools_entrypoints("airflow.policy") 

236 

237 

238def _get_async_conn_uri_from_sync(sync_uri): 

239 AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"} 

240 """Mapping of sync scheme to async scheme.""" 

241 

242 scheme, rest = sync_uri.split(":", maxsplit=1) 

243 scheme = scheme.split("+", maxsplit=1)[0] 

244 aiolib = AIO_LIBS_MAPPING.get(scheme) 

245 if aiolib: 

246 return f"{scheme}+{aiolib}:{rest}" 

247 return sync_uri 

248 

249 

250def configure_vars(): 

251 """Configure Global Variables from airflow.cfg.""" 

252 global SQL_ALCHEMY_CONN 

253 global SQL_ALCHEMY_CONN_ASYNC 

254 global DAGS_FOLDER 

255 global PLUGINS_FOLDER 

256 

257 SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn") 

258 if conf.has_option("database", "sql_alchemy_conn_async"): 

259 SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async") 

260 else: 

261 SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN) 

262 

263 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) 

264 

265 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins")) 

266 

267 

268def _run_openlineage_runtime_check(): 

269 """ 

270 Ensure compatibility of OpenLineage provider package and Airflow version. 

271 

272 Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage 

273 provider incompatible with future Airflow versions (>= 2.10.0). 

274 """ 

275 ol_package = "apache-airflow-providers-openlineage" 

276 try: 

277 ol_version = metadata.version(ol_package) 

278 except metadata.PackageNotFoundError: 

279 return 

280 

281 if ol_version and Version(ol_version) < Version("1.8.0.dev0"): 

282 raise RuntimeError( 

283 f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with " 

284 f"`apache-airflow` == `{airflow_version}`. " 

285 f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`." 

286 ) 

287 

288 

289def run_providers_custom_runtime_checks(): 

290 _run_openlineage_runtime_check() 

291 

292 

293class SkipDBTestsSession: 

294 """ 

295 This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set. 

296 

297 :meta private: 

298 """ 

299 

300 def __init__(self): 

301 raise AirflowInternalRuntimeError( 

302 "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n" 

303 "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n" 

304 "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#" 

305 "best-practices-for-db-tests on how " 

306 "to deal with it and consult examples." 

307 ) 

308 

309 def remove(*args, **kwargs): 

310 pass 

311 

312 def get_bind( 

313 self, 

314 mapper=None, 

315 clause=None, 

316 bind=None, 

317 _sa_skip_events=None, 

318 _sa_skip_for_implicit_returning=False, 

319 ): 

320 pass 

321 

322 

323def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool: 

324 """Determine whether the database connection URI specifies a relative path.""" 

325 # Check for non-empty connection string: 

326 if not sqla_conn_str: 

327 return False 

328 # Check for the right URI scheme: 

329 if not sqla_conn_str.startswith("sqlite"): 

330 return False 

331 # In-memory is not useful for production, but useful for writing tests against Airflow for extensions 

332 if sqla_conn_str == "sqlite://": 

333 return False 

334 # Check for absolute path: 

335 if sqla_conn_str.startswith(abs_prefix := "sqlite:///") and os.path.isabs( 

336 sqla_conn_str[len(abs_prefix) :] 

337 ): 

338 return False 

339 return True 

340 

341 

342def _get_connect_args(mode: Literal["sync", "async"]) -> Any: 

343 key = { 

344 "sync": "sql_alchemy_connect_args", 

345 "async": "sql_alchemy_connect_args_async", 

346 }[mode] 

347 if conf.has_option("database", key): 

348 return conf.getimport("database", key) 

349 return {} 

350 

351 

352def _configure_async_session() -> None: 

353 """ 

354 Configure async SQLAlchemy session. 

355 

356 This exists so tests can reconfigure the session. How SQLAlchemy configures 

357 this does not work well with Pytest and you can end up with issues when the 

358 session and runs in a different event loop from the test itself. 

359 """ 

360 global AsyncSession, async_engine 

361 

362 if not SQL_ALCHEMY_CONN_ASYNC: 

363 async_engine = None 

364 AsyncSession = None 

365 return 

366 

367 async_engine = create_async_engine( 

368 SQL_ALCHEMY_CONN_ASYNC, 

369 connect_args=_get_connect_args("async"), 

370 future=True, 

371 ) 

372 AsyncSession = async_sessionmaker( 

373 bind=async_engine, 

374 class_=SAAsyncSession, 

375 autoflush=False, 

376 expire_on_commit=False, 

377 ) 

378 

379 

380def configure_orm(disable_connection_pool=False, pool_class=None): 

381 """Configure ORM using SQLAlchemy.""" 

382 from airflow._shared.secrets_masker import mask_secret 

383 

384 if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN): 

385 from airflow.exceptions import AirflowConfigException 

386 

387 raise AirflowConfigException( 

388 f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. " 

389 "Please use absolute path such as `sqlite:////tmp/airflow.db`." 

390 ) 

391 

392 global NonScopedSession 

393 global Session 

394 global engine 

395 

396 if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": 

397 # Skip DB initialization in unit tests, if DB tests are skipped 

398 Session = SkipDBTestsSession 

399 engine = None 

400 return 

401 log.debug("Setting up DB connection pool (PID %s)", os.getpid()) 

402 engine_args = prepare_engine_args(disable_connection_pool, pool_class) 

403 

404 connect_args = _get_connect_args("sync") 

405 if SQL_ALCHEMY_CONN.startswith("sqlite"): 

406 # FastAPI runs sync endpoints in a separate thread. SQLite does not allow 

407 # to use objects created in another threads by default. Allowing that in test 

408 # to so the `test` thread and the tested endpoints can use common objects. 

409 connect_args["check_same_thread"] = False 

410 

411 engine = create_engine( 

412 SQL_ALCHEMY_CONN, 

413 connect_args=connect_args, 

414 **engine_args, 

415 future=True, 

416 ) 

417 _configure_async_session() 

418 mask_secret(engine.url.password) 

419 setup_event_handlers(engine) 

420 

421 if conf.has_option("database", "sql_alchemy_session_maker"): 

422 _session_maker = conf.getimport("database", "sql_alchemy_session_maker") 

423 else: 

424 _session_maker = partial( 

425 sessionmaker, 

426 autocommit=False, 

427 autoflush=False, 

428 expire_on_commit=False, 

429 ) 

430 if engine is None: 

431 raise RuntimeError("Engine must be initialized before creating a session") 

432 NonScopedSession = _session_maker(engine) 

433 Session = scoped_session(NonScopedSession) 

434 

435 if register_at_fork := getattr(os, "register_at_fork", None): 

436 # https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork 

437 def clean_in_fork(): 

438 _globals = globals() 

439 if engine := _globals.get("engine"): 

440 engine.dispose(close=False) 

441 if async_engine := _globals.get("async_engine"): 

442 async_engine.sync_engine.dispose(close=False) 

443 

444 # Won't work on Windows 

445 register_at_fork(after_in_child=clean_in_fork) 

446 

447 

448def prepare_engine_args(disable_connection_pool=False, pool_class=None): 

449 """Prepare SQLAlchemy engine args.""" 

450 DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = { 

451 "postgresql": ( 

452 { 

453 "executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000, 

454 } 

455 | ( 

456 {} 

457 if _USE_PSYCOPG3 

458 else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000} 

459 ) 

460 ) 

461 } 

462 

463 default_args = {} 

464 for dialect, default in DEFAULT_ENGINE_ARGS.items(): 

465 if SQL_ALCHEMY_CONN.startswith(dialect): 

466 default_args = default.copy() 

467 break 

468 

469 engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args) 

470 

471 if pool_class: 

472 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args 

473 engine_args["poolclass"] = pool_class 

474 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"): 

475 engine_args["poolclass"] = NullPool 

476 log.debug("settings.prepare_engine_args(): Using NullPool") 

477 elif not SQL_ALCHEMY_CONN.startswith("sqlite"): 

478 # Pool size engine args not supported by sqlite. 

479 # If no config value is defined for the pool size, select a reasonable value. 

480 # 0 means no limit, which could lead to exceeding the Database connection limit. 

481 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5) 

482 

483 # The maximum overflow size of the pool. 

484 # When the number of checked-out connections reaches the size set in pool_size, 

485 # additional connections will be returned up to this limit. 

486 # When those additional connections are returned to the pool, they are disconnected and discarded. 

487 # It follows then that the total number of simultaneous connections 

488 # the pool will allow is pool_size + max_overflow, 

489 # and the total number of "sleeping" connections the pool will allow is pool_size. 

490 # max_overflow can be set to -1 to indicate no overflow limit; 

491 # no limit will be placed on the total number 

492 # of concurrent connections. Defaults to 10. 

493 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10) 

494 

495 # The DB server already has a value for wait_timeout (number of seconds after 

496 # which an idle sleeping connection should be killed). Since other DBs may 

497 # co-exist on the same server, SQLAlchemy should set its 

498 # pool_recycle to an equal or smaller value. 

499 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800) 

500 

501 # Check connection at the start of each connection pool checkout. 

502 # Typically, this is a simple statement like "SELECT 1", but may also make use 

503 # of some DBAPI-specific method to test the connection for liveness. 

504 # More information here: 

505 # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic 

506 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True) 

507 

508 log.debug( 

509 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, " 

510 "pool_recycle=%d, pid=%d", 

511 pool_size, 

512 max_overflow, 

513 pool_recycle, 

514 os.getpid(), 

515 ) 

516 engine_args["pool_size"] = pool_size 

517 engine_args["pool_recycle"] = pool_recycle 

518 engine_args["pool_pre_ping"] = pool_pre_ping 

519 engine_args["max_overflow"] = max_overflow 

520 

521 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when 

522 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots. 

523 # 'READ COMMITTED' is the default value for PostgreSQL. 

524 # More information here: 

525 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html 

526 if SQL_ALCHEMY_CONN.startswith("mysql"): 

527 engine_args["isolation_level"] = "READ COMMITTED" 

528 

529 if is_sqlalchemy_v1(): 

530 # Allow the user to specify an encoding for their DB otherwise default 

531 # to utf-8 so jobs & users with non-latin1 characters can still use us. 

532 # This parameter was removed in SQLAlchemy 2.x. 

533 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8") 

534 

535 return engine_args 

536 

537 

538def dispose_orm(do_log: bool = True): 

539 """Properly close pooled database connections.""" 

540 global Session, engine, NonScopedSession 

541 

542 _globals = globals() 

543 if _globals.get("engine") is None and _globals.get("Session") is None: 

544 return 

545 

546 if do_log: 

547 log.debug("Disposing DB connection pool (PID %s)", os.getpid()) 

548 

549 if "Session" in _globals and Session is not None: 

550 from sqlalchemy.orm.session import close_all_sessions 

551 

552 Session.remove() 

553 Session = None 

554 NonScopedSession = None 

555 close_all_sessions() 

556 

557 if "engine" in _globals and engine is not None: 

558 engine.dispose() 

559 engine = None 

560 

561 

562def reconfigure_orm(disable_connection_pool=False, pool_class=None): 

563 """Properly close database connections and re-configure ORM.""" 

564 dispose_orm() 

565 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class) 

566 

567 

568def configure_adapters(): 

569 """Register Adapters and DB Converters.""" 

570 from pendulum import DateTime as Pendulum 

571 

572 if SQL_ALCHEMY_CONN.startswith("sqlite"): 

573 from sqlite3 import register_adapter 

574 

575 register_adapter(Pendulum, lambda val: val.isoformat(" ")) 

576 

577 if SQL_ALCHEMY_CONN.startswith("mysql"): 

578 try: 

579 try: 

580 import MySQLdb.converters 

581 except ImportError: 

582 raise RuntimeError( 

583 "You do not have `mysqlclient` package installed. " 

584 "Please install it with `pip install mysqlclient` and make sure you have system " 

585 "mysql libraries installed, as well as well as `pkg-config` system package " 

586 "installed in case you see compilation error during installation." 

587 ) 

588 

589 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal 

590 except ImportError: 

591 pass 

592 try: 

593 import pymysql.converters 

594 

595 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime 

596 except ImportError: 

597 pass 

598 

599 

600def _configure_secrets_masker(): 

601 """Configure the secrets masker with values from config.""" 

602 from airflow._shared.secrets_masker import ( 

603 DEFAULT_SENSITIVE_FIELDS, 

604 _secrets_masker as secrets_masker_core, 

605 ) 

606 from airflow.configuration import conf 

607 

608 min_length_to_mask = conf.getint("logging", "min_length_masked_secret", fallback=5) 

609 secret_mask_adapter = conf.getimport("logging", "secret_mask_adapter", fallback=None) 

610 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy() 

611 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names") 

612 if sensitive_variable_fields: 

613 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")}) 

614 

615 core_masker = secrets_masker_core() 

616 core_masker.min_length_to_mask = min_length_to_mask 

617 core_masker.sensitive_variables_fields = list(sensitive_fields) 

618 core_masker.secret_mask_adapter = secret_mask_adapter 

619 

620 from airflow.sdk._shared.secrets_masker import _secrets_masker as sdk_secrets_masker 

621 

622 sdk_masker = sdk_secrets_masker() 

623 sdk_masker.min_length_to_mask = min_length_to_mask 

624 sdk_masker.sensitive_variables_fields = list(sensitive_fields) 

625 sdk_masker.secret_mask_adapter = secret_mask_adapter 

626 

627 

628def configure_action_logging() -> None: 

629 """Any additional configuration (register callback) for airflow.utils.action_loggers module.""" 

630 

631 

632def prepare_syspath_for_config_and_plugins(): 

633 """Update sys.path for the config and plugins directories.""" 

634 # Add ./config/ for loading custom log parsers etc, or 

635 # airflow_local_settings etc. 

636 config_path = os.path.join(AIRFLOW_HOME, "config") 

637 if config_path not in sys.path: 

638 sys.path.append(config_path) 

639 

640 if PLUGINS_FOLDER not in sys.path: 

641 sys.path.append(PLUGINS_FOLDER) 

642 

643 

644def __getattr__(name: str): 

645 """Handle deprecated module attributes.""" 

646 import warnings 

647 

648 from airflow.exceptions import RemovedInAirflow4Warning 

649 

650 if name == "MASK_SECRETS_IN_LOGS": 

651 warnings.warn( 

652 "settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns default value of False. " 

653 "Use SecretsMasker.enable_log_masking(), disable_log_masking(), or is_log_masking_enabled() instead.", 

654 RemovedInAirflow4Warning, 

655 stacklevel=2, 

656 ) 

657 return False 

658 if name == "WEB_COLORS": 

659 warnings.warn( 

660 "settings.WEB_COLORS has been removed. This shim returns default value. " 

661 "Please upgrade your provider or integration.", 

662 RemovedInAirflow4Warning, 

663 stacklevel=2, 

664 ) 

665 return {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"} 

666 if name == "EXECUTE_TASKS_NEW_PYTHON_INTERPRETER": 

667 warnings.warn( 

668 "settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER has been removed. This shim returns default value. " 

669 "Please upgrade your provider or integration.", 

670 RemovedInAirflow4Warning, 

671 stacklevel=2, 

672 ) 

673 return not hasattr(os, "fork") or conf.getboolean( 

674 "core", 

675 "execute_tasks_new_python_interpreter", 

676 fallback=False, 

677 ) 

678 

679 raise AttributeError(f"module '{__name__}' has no attribute '{name}'") 

680 

681 

682def import_local_settings(): 

683 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file.""" 

684 try: 

685 import airflow_local_settings 

686 except ModuleNotFoundError as e: 

687 if e.name == "airflow_local_settings": 

688 log.debug("No airflow_local_settings to import.", exc_info=True) 

689 else: 

690 log.critical( 

691 "Failed to import airflow_local_settings due to a transitive module not found error.", 

692 exc_info=True, 

693 ) 

694 raise 

695 except ImportError: 

696 log.critical("Failed to import airflow_local_settings.", exc_info=True) 

697 raise 

698 else: 

699 if hasattr(airflow_local_settings, "__all__"): 

700 names = set(airflow_local_settings.__all__) 

701 else: 

702 names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")} 

703 

704 plugin_functions = policies.make_plugin_from_local_settings( 

705 get_policy_plugin_manager(), airflow_local_settings, names 

706 ) 

707 

708 # If we have already handled a function by adding it to the plugin, 

709 # then don't clobber the global function 

710 for name in names - plugin_functions: 

711 globals()[name] = getattr(airflow_local_settings, name) 

712 

713 if get_policy_plugin_manager().hook.task_instance_mutation_hook.get_hookimpls(): 

714 task_instance_mutation_hook.is_noop = False 

715 

716 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__) 

717 

718 

719def initialize(): 

720 """Initialize Airflow with all the settings from this file.""" 

721 configure_vars() 

722 prepare_syspath_for_config_and_plugins() 

723 policy_mgr = get_policy_plugin_manager() 

724 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything 

725 # in airflow_local_settings to take precendec 

726 load_policy_plugins(policy_mgr) 

727 import_local_settings() 

728 configure_logging() 

729 

730 configure_adapters() 

731 # The webservers import this file from models.py with the default settings. 

732 

733 # Configure secrets masker before masking secrets 

734 _configure_secrets_masker() 

735 

736 is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" 

737 if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None) and not is_worker: 

738 configure_orm() 

739 

740 # mask the sensitive_config_values 

741 conf.mask_secrets() 

742 configure_action_logging() 

743 

744 # Run any custom runtime checks that needs to be executed for providers 

745 run_providers_custom_runtime_checks() 

746 

747 # Ensure we close DB connections at scheduler and gunicorn worker terminations 

748 atexit.register(dispose_orm) 

749 

750 

751# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False, 

752# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module. 

753LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True) 

754 

755# By default Airflow providers are lazily-discovered (discovery and imports happen only when required). 

756# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or 

757# loaded from module. 

758LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True) 

759 

760DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")