Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/settings.py: 69%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

359 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import atexit 

21import json as json_lib 

22import logging 

23import os 

24import sys 

25import warnings 

26from collections.abc import Callable 

27from functools import cache, partial 

28from importlib import metadata 

29from typing import TYPE_CHECKING, Any, Literal 

30 

31import pluggy 

32from packaging.version import Version 

33from sqlalchemy import create_engine 

34from sqlalchemy.ext.asyncio import ( 

35 AsyncEngine, 

36 AsyncSession as SAAsyncSession, 

37 create_async_engine, 

38) 

39from sqlalchemy.orm import scoped_session, sessionmaker 

40 

41try: 

42 from sqlalchemy.ext.asyncio import async_sessionmaker 

43except ImportError: 

44 async_sessionmaker = sessionmaker # type: ignore[assignment,misc] 

45 

46from sqlalchemy.pool import NullPool 

47 

48from airflow import __version__ as airflow_version, policies 

49from airflow._shared.timezones.timezone import ( 

50 initialize as initialize_timezone, 

51 local_timezone, 

52 parse_timezone, 

53 utc, 

54) 

55from airflow.configuration import AIRFLOW_HOME, conf 

56from airflow.exceptions import AirflowInternalRuntimeError 

57from airflow.logging_config import configure_logging 

58from airflow.utils.orm_event_handlers import setup_event_handlers 

59from airflow.utils.sqlalchemy import is_sqlalchemy_v1 

60 

61USE_PSYCOPG3: bool 

62try: 

63 from importlib.util import find_spec 

64 

65 is_psycopg3 = find_spec("psycopg") is not None 

66 

67 USE_PSYCOPG3 = is_psycopg3 and not is_sqlalchemy_v1() 

68except (ImportError, ModuleNotFoundError): 

69 USE_PSYCOPG3 = False 

70 

71if TYPE_CHECKING: 

72 from sqlalchemy.engine import Engine 

73 

74 from airflow.api_fastapi.common.types import UIAlert 

75 

76log = logging.getLogger(__name__) 

77 

78try: 

79 tz_str = conf.get_mandatory_value("core", "default_timezone") 

80 initialize_timezone(tz_str) 

81 if tz_str != "system": 

82 TIMEZONE = parse_timezone(tz_str) 

83 else: 

84 TIMEZONE = local_timezone() 

85except Exception: 

86 TIMEZONE = utc 

87 initialize_timezone("UTC") 

88 

89log.info("Configured default timezone %s", TIMEZONE) 

90 

91if conf.has_option("database", "sql_alchemy_session_maker"): 

92 log.info( 

93 '[Warning] Found config "sql_alchemy_session_maker", make sure you know what you are doing.\n' 

94 "[Warning] Improper configuration of sql_alchemy_session_maker can lead to serious issues, " 

95 "including data corruption, unrecoverable application crashes.\n" 

96 "[Warning] Please review the SQLAlchemy documentation for detailed guidance on " 

97 "proper configuration and best practices." 

98 ) 

99 

100HEADER = "\n".join( 

101 [ 

102 r" ____________ _____________", 

103 r" ____ |__( )_________ __/__ /________ __", 

104 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /", 

105 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /", 

106 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/", 

107 ] 

108) 

109 

110LOGGING_LEVEL = logging.INFO 

111 

112# the prefix to append to gunicorn worker processes after init 

113GUNICORN_WORKER_READY_PREFIX = "[ready] " 

114 

115LOG_FORMAT = conf.get("logging", "log_format") 

116SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format") 

117 

118SQL_ALCHEMY_CONN: str | None = None 

119SQL_ALCHEMY_CONN_ASYNC: str | None = None 

120PLUGINS_FOLDER: str | None = None 

121DONOT_MODIFY_HANDLERS: bool | None = None 

122DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER")) 

123 

124AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"} 

125""" 

126Mapping of sync scheme to async scheme. 

127 

128:meta private: 

129""" 

130 

131engine: Engine | None = None 

132Session: scoped_session | None = None 

133# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without 

134# additional precautions. The only use case is when the session lifecycle needs 

135# custom handling. Most of the time we only want one unique thread local session object, 

136# this is achieved by the Session factory above. 

137NonScopedSession: sessionmaker | None = None 

138async_engine: AsyncEngine | None = None 

139AsyncSession: Callable[..., SAAsyncSession] | None = None 

140 

141 

142def get_engine(): 

143 """Get the configured engine, raising an error if not configured.""" 

144 if engine is None: 

145 raise RuntimeError("Engine not configured. Call configure_orm() first.") 

146 return engine 

147 

148 

149def get_session(): 

150 """Get the configured Session, raising an error if not configured.""" 

151 if Session is None: 

152 raise RuntimeError("Session not configured. Call configure_orm() first.") 

153 return Session 

154 

155 

156# The JSON library to use for DAG Serialization and De-Serialization 

157json = json_lib 

158 

159# Display alerts on the dashboard 

160# Useful for warning about setup issues or announcing changes to end users 

161# List of UIAlerts, which allows for specifying the content and category 

162# message to be shown. For example: 

163# from airflow.api_fastapi.common.types import UIAlert 

164# 

165# DASHBOARD_UIALERTS = [ 

166# UIAlert(text="Welcome to Airflow", category="info"), 

167# UIAlert(text="Upgrade tomorrow [help](https://www.example.com)", category="warning"), #With markdown support 

168# ] 

169# 

170DASHBOARD_UIALERTS: list[UIAlert] = [] 

171 

172 

173@cache 

174def _get_rich_console(file): 

175 # Delay imports until we need it 

176 import rich.console 

177 

178 return rich.console.Console(file=file) 

179 

180 

181def custom_show_warning(message, category, filename, lineno, file=None, line=None): 

182 """Print rich and visible warnings.""" 

183 # Delay imports until we need it 

184 import re 

185 

186 from rich.markup import escape 

187 

188 re_escape_regex = re.compile(r"(\\*)(\[[a-z#/@][^[]*?])").sub 

189 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}" 

190 msg += f" {category.__name__}[/bold]: {escape(str(message), _escape=re_escape_regex)}[/yellow]" 

191 write_console = _get_rich_console(file or sys.stderr) 

192 write_console.print(msg, soft_wrap=True) 

193 

194 

195def replace_showwarning(replacement): 

196 """ 

197 Replace ``warnings.showwarning``, returning the original. 

198 

199 This is useful since we want to "reset" the ``showwarning`` hook on exit to 

200 avoid lazy-loading issues. If a warning is emitted after Python cleaned up 

201 the import system, we would no longer be able to import ``rich``. 

202 """ 

203 original = warnings.showwarning 

204 warnings.showwarning = replacement 

205 return original 

206 

207 

208original_show_warning = replace_showwarning(custom_show_warning) 

209atexit.register(partial(replace_showwarning, original_show_warning)) 

210 

211 

212def task_policy(task): 

213 return get_policy_plugin_manager().hook.task_policy(task=task) 

214 

215 

216def dag_policy(dag): 

217 return get_policy_plugin_manager().hook.dag_policy(dag=dag) 

218 

219 

220def task_instance_mutation_hook(task_instance): 

221 return get_policy_plugin_manager().hook.task_instance_mutation_hook(task_instance=task_instance) 

222 

223 

224task_instance_mutation_hook.is_noop = True # type: ignore 

225 

226 

227def pod_mutation_hook(pod): 

228 return get_policy_plugin_manager().hook.pod_mutation_hook(pod=pod) 

229 

230 

231def get_airflow_context_vars(context): 

232 return get_policy_plugin_manager().hook.get_airflow_context_vars(context=context) 

233 

234 

235def get_dagbag_import_timeout(dag_file_path: str): 

236 return get_policy_plugin_manager().hook.get_dagbag_import_timeout(dag_file_path=dag_file_path) 

237 

238 

239@cache 

240def get_policy_plugin_manager() -> pluggy.PluginManager: 

241 plugin_mgr = pluggy.PluginManager(policies.local_settings_hookspec.project_name) 

242 plugin_mgr.add_hookspecs(policies) 

243 plugin_mgr.register(policies.DefaultPolicy) 

244 return plugin_mgr 

245 

246 

247def load_policy_plugins(pm: pluggy.PluginManager): 

248 # We can't log duration etc here, as logging hasn't yet been configured! 

249 pm.load_setuptools_entrypoints("airflow.policy") 

250 

251 

252def _get_async_conn_uri_from_sync(sync_uri): 

253 scheme, rest = sync_uri.split(":", maxsplit=1) 

254 scheme = scheme.split("+", maxsplit=1)[0] 

255 aiolib = AIO_LIBS_MAPPING.get(scheme) 

256 if aiolib: 

257 return f"{scheme}+{aiolib}:{rest}" 

258 return sync_uri 

259 

260 

261def configure_vars(): 

262 """Configure Global Variables from airflow.cfg.""" 

263 global SQL_ALCHEMY_CONN 

264 global SQL_ALCHEMY_CONN_ASYNC 

265 global DAGS_FOLDER 

266 global PLUGINS_FOLDER 

267 global DONOT_MODIFY_HANDLERS 

268 

269 SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn") 

270 if conf.has_option("database", "sql_alchemy_conn_async"): 

271 SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async") 

272 else: 

273 SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN) 

274 

275 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) 

276 

277 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins")) 

278 

279 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command 

280 # If the flag is set to False, we remove all handlers from the root logger 

281 # and add all handlers from 'airflow.task' logger to the root Logger. This is done 

282 # to get all the logs from the print & log statements in the DAG files before a task is run 

283 # The handlers are restored after the task completes execution. 

284 DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False) 

285 

286 

287def _run_openlineage_runtime_check(): 

288 """ 

289 Ensure compatibility of OpenLineage provider package and Airflow version. 

290 

291 Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage 

292 provider incompatible with future Airflow versions (>= 2.10.0). 

293 """ 

294 ol_package = "apache-airflow-providers-openlineage" 

295 try: 

296 ol_version = metadata.version(ol_package) 

297 except metadata.PackageNotFoundError: 

298 return 

299 

300 if ol_version and Version(ol_version) < Version("1.8.0.dev0"): 

301 raise RuntimeError( 

302 f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with " 

303 f"`apache-airflow` == `{airflow_version}`. " 

304 f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`." 

305 ) 

306 

307 

308def run_providers_custom_runtime_checks(): 

309 _run_openlineage_runtime_check() 

310 

311 

312class SkipDBTestsSession: 

313 """ 

314 This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set. 

315 

316 :meta private: 

317 """ 

318 

319 def __init__(self): 

320 raise AirflowInternalRuntimeError( 

321 "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n" 

322 "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n" 

323 "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#" 

324 "best-practices-for-db-tests on how " 

325 "to deal with it and consult examples." 

326 ) 

327 

328 def remove(*args, **kwargs): 

329 pass 

330 

331 def get_bind( 

332 self, 

333 mapper=None, 

334 clause=None, 

335 bind=None, 

336 _sa_skip_events=None, 

337 _sa_skip_for_implicit_returning=False, 

338 ): 

339 pass 

340 

341 

342AIRFLOW_PATH = os.path.dirname(os.path.dirname(__file__)) 

343AIRFLOW_TESTS_PATH = os.path.join(AIRFLOW_PATH, "tests") 

344AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py") 

345AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "session.py") 

346AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "baseoperator.py") 

347 

348 

349def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool: 

350 """Determine whether the database connection URI specifies a relative path.""" 

351 # Check for non-empty connection string: 

352 if not sqla_conn_str: 

353 return False 

354 # Check for the right URI scheme: 

355 if not sqla_conn_str.startswith("sqlite"): 

356 return False 

357 # In-memory is not useful for production, but useful for writing tests against Airflow for extensions 

358 if sqla_conn_str == "sqlite://": 

359 return False 

360 # Check for absolute path: 

361 if sqla_conn_str.startswith(abs_prefix := "sqlite:///") and os.path.isabs( 

362 sqla_conn_str[len(abs_prefix) :] 

363 ): 

364 return False 

365 return True 

366 

367 

368def _get_connect_args(mode: Literal["sync", "async"]) -> Any: 

369 key = { 

370 "sync": "sql_alchemy_connect_args", 

371 "async": "sql_alchemy_connect_args_async", 

372 }[mode] 

373 if conf.has_option("database", key): 

374 return conf.getimport("database", key) 

375 return {} 

376 

377 

378def _configure_async_session() -> None: 

379 """ 

380 Configure async SQLAlchemy session. 

381 

382 This exists so tests can reconfigure the session. How SQLAlchemy configures 

383 this does not work well with Pytest and you can end up with issues when the 

384 session and runs in a different event loop from the test itself. 

385 """ 

386 global AsyncSession, async_engine 

387 

388 if not SQL_ALCHEMY_CONN_ASYNC: 

389 async_engine = None 

390 AsyncSession = None 

391 return 

392 

393 async_engine = create_async_engine( 

394 SQL_ALCHEMY_CONN_ASYNC, 

395 connect_args=_get_connect_args("async"), 

396 future=True, 

397 ) 

398 AsyncSession = async_sessionmaker( 

399 bind=async_engine, 

400 class_=SAAsyncSession, 

401 autoflush=False, 

402 expire_on_commit=False, 

403 ) 

404 

405 

406def configure_orm(disable_connection_pool=False, pool_class=None): 

407 """Configure ORM using SQLAlchemy.""" 

408 from airflow._shared.secrets_masker import mask_secret 

409 

410 if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN): 

411 from airflow.exceptions import AirflowConfigException 

412 

413 raise AirflowConfigException( 

414 f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. " 

415 "Please use absolute path such as `sqlite:////tmp/airflow.db`." 

416 ) 

417 

418 global NonScopedSession 

419 global Session 

420 global engine 

421 

422 if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": 

423 # Skip DB initialization in unit tests, if DB tests are skipped 

424 Session = SkipDBTestsSession 

425 engine = None 

426 return 

427 log.debug("Setting up DB connection pool (PID %s)", os.getpid()) 

428 engine_args = prepare_engine_args(disable_connection_pool, pool_class) 

429 

430 connect_args = _get_connect_args("sync") 

431 if SQL_ALCHEMY_CONN.startswith("sqlite"): 

432 # FastAPI runs sync endpoints in a separate thread. SQLite does not allow 

433 # to use objects created in another threads by default. Allowing that in test 

434 # to so the `test` thread and the tested endpoints can use common objects. 

435 connect_args["check_same_thread"] = False 

436 

437 engine = create_engine( 

438 SQL_ALCHEMY_CONN, 

439 connect_args=connect_args, 

440 **engine_args, 

441 future=True, 

442 ) 

443 _configure_async_session() 

444 mask_secret(engine.url.password) 

445 setup_event_handlers(engine) 

446 

447 if conf.has_option("database", "sql_alchemy_session_maker"): 

448 _session_maker = conf.getimport("database", "sql_alchemy_session_maker") 

449 else: 

450 _session_maker = partial( 

451 sessionmaker, 

452 autocommit=False, 

453 autoflush=False, 

454 expire_on_commit=False, 

455 ) 

456 if engine is None: 

457 raise RuntimeError("Engine must be initialized before creating a session") 

458 NonScopedSession = _session_maker(engine) 

459 Session = scoped_session(NonScopedSession) 

460 

461 if register_at_fork := getattr(os, "register_at_fork", None): 

462 # https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork 

463 def clean_in_fork(): 

464 _globals = globals() 

465 if engine := _globals.get("engine"): 

466 engine.dispose(close=False) 

467 if async_engine := _globals.get("async_engine"): 

468 async_engine.sync_engine.dispose(close=False) 

469 

470 # Won't work on Windows 

471 register_at_fork(after_in_child=clean_in_fork) 

472 

473 

474DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = { 

475 "postgresql": ( 

476 { 

477 "executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000, 

478 } 

479 | ( 

480 {} 

481 if USE_PSYCOPG3 

482 else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000} 

483 ) 

484 ) 

485} 

486 

487 

488def prepare_engine_args(disable_connection_pool=False, pool_class=None): 

489 """Prepare SQLAlchemy engine args.""" 

490 default_args = {} 

491 for dialect, default in DEFAULT_ENGINE_ARGS.items(): 

492 if SQL_ALCHEMY_CONN.startswith(dialect): 

493 default_args = default.copy() 

494 break 

495 

496 engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args) 

497 

498 if pool_class: 

499 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args 

500 engine_args["poolclass"] = pool_class 

501 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"): 

502 engine_args["poolclass"] = NullPool 

503 log.debug("settings.prepare_engine_args(): Using NullPool") 

504 elif not SQL_ALCHEMY_CONN.startswith("sqlite"): 

505 # Pool size engine args not supported by sqlite. 

506 # If no config value is defined for the pool size, select a reasonable value. 

507 # 0 means no limit, which could lead to exceeding the Database connection limit. 

508 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5) 

509 

510 # The maximum overflow size of the pool. 

511 # When the number of checked-out connections reaches the size set in pool_size, 

512 # additional connections will be returned up to this limit. 

513 # When those additional connections are returned to the pool, they are disconnected and discarded. 

514 # It follows then that the total number of simultaneous connections 

515 # the pool will allow is pool_size + max_overflow, 

516 # and the total number of "sleeping" connections the pool will allow is pool_size. 

517 # max_overflow can be set to -1 to indicate no overflow limit; 

518 # no limit will be placed on the total number 

519 # of concurrent connections. Defaults to 10. 

520 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10) 

521 

522 # The DB server already has a value for wait_timeout (number of seconds after 

523 # which an idle sleeping connection should be killed). Since other DBs may 

524 # co-exist on the same server, SQLAlchemy should set its 

525 # pool_recycle to an equal or smaller value. 

526 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800) 

527 

528 # Check connection at the start of each connection pool checkout. 

529 # Typically, this is a simple statement like "SELECT 1", but may also make use 

530 # of some DBAPI-specific method to test the connection for liveness. 

531 # More information here: 

532 # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic 

533 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True) 

534 

535 log.debug( 

536 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, " 

537 "pool_recycle=%d, pid=%d", 

538 pool_size, 

539 max_overflow, 

540 pool_recycle, 

541 os.getpid(), 

542 ) 

543 engine_args["pool_size"] = pool_size 

544 engine_args["pool_recycle"] = pool_recycle 

545 engine_args["pool_pre_ping"] = pool_pre_ping 

546 engine_args["max_overflow"] = max_overflow 

547 

548 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when 

549 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots. 

550 # 'READ COMMITTED' is the default value for PostgreSQL. 

551 # More information here: 

552 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html 

553 if SQL_ALCHEMY_CONN.startswith("mysql"): 

554 engine_args["isolation_level"] = "READ COMMITTED" 

555 

556 if is_sqlalchemy_v1(): 

557 # Allow the user to specify an encoding for their DB otherwise default 

558 # to utf-8 so jobs & users with non-latin1 characters can still use us. 

559 # This parameter was removed in SQLAlchemy 2.x. 

560 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8") 

561 

562 return engine_args 

563 

564 

565def dispose_orm(do_log: bool = True): 

566 """Properly close pooled database connections.""" 

567 global Session, engine, NonScopedSession 

568 

569 _globals = globals() 

570 if _globals.get("engine") is None and _globals.get("Session") is None: 

571 return 

572 

573 if do_log: 

574 log.debug("Disposing DB connection pool (PID %s)", os.getpid()) 

575 

576 if "Session" in _globals and Session is not None: 

577 from sqlalchemy.orm.session import close_all_sessions 

578 

579 Session.remove() 

580 Session = None 

581 NonScopedSession = None 

582 close_all_sessions() 

583 

584 if "engine" in _globals and engine is not None: 

585 engine.dispose() 

586 engine = None 

587 

588 

589def reconfigure_orm(disable_connection_pool=False, pool_class=None): 

590 """Properly close database connections and re-configure ORM.""" 

591 dispose_orm() 

592 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class) 

593 

594 

595def configure_adapters(): 

596 """Register Adapters and DB Converters.""" 

597 from pendulum import DateTime as Pendulum 

598 

599 if SQL_ALCHEMY_CONN.startswith("sqlite"): 

600 from sqlite3 import register_adapter 

601 

602 register_adapter(Pendulum, lambda val: val.isoformat(" ")) 

603 

604 if SQL_ALCHEMY_CONN.startswith("mysql"): 

605 try: 

606 try: 

607 import MySQLdb.converters 

608 except ImportError: 

609 raise RuntimeError( 

610 "You do not have `mysqlclient` package installed. " 

611 "Please install it with `pip install mysqlclient` and make sure you have system " 

612 "mysql libraries installed, as well as well as `pkg-config` system package " 

613 "installed in case you see compilation error during installation." 

614 ) 

615 

616 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal 

617 except ImportError: 

618 pass 

619 try: 

620 import pymysql.converters 

621 

622 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime 

623 except ImportError: 

624 pass 

625 

626 

627def _configure_secrets_masker(): 

628 """Configure the secrets masker with values from config.""" 

629 from airflow._shared.secrets_masker import ( 

630 DEFAULT_SENSITIVE_FIELDS, 

631 _secrets_masker as secrets_masker_core, 

632 ) 

633 from airflow.configuration import conf 

634 

635 min_length_to_mask = conf.getint("logging", "min_length_masked_secret", fallback=5) 

636 secret_mask_adapter = conf.getimport("logging", "secret_mask_adapter", fallback=None) 

637 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy() 

638 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names") 

639 if sensitive_variable_fields: 

640 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")}) 

641 

642 core_masker = secrets_masker_core() 

643 core_masker.min_length_to_mask = min_length_to_mask 

644 core_masker.sensitive_variables_fields = list(sensitive_fields) 

645 core_masker.secret_mask_adapter = secret_mask_adapter 

646 

647 from airflow.sdk._shared.secrets_masker import _secrets_masker as sdk_secrets_masker 

648 

649 sdk_masker = sdk_secrets_masker() 

650 sdk_masker.min_length_to_mask = min_length_to_mask 

651 sdk_masker.sensitive_variables_fields = list(sensitive_fields) 

652 sdk_masker.secret_mask_adapter = secret_mask_adapter 

653 

654 

655def configure_action_logging() -> None: 

656 """Any additional configuration (register callback) for airflow.utils.action_loggers module.""" 

657 

658 

659def prepare_syspath_for_config_and_plugins(): 

660 """Update sys.path for the config and plugins directories.""" 

661 # Add ./config/ for loading custom log parsers etc, or 

662 # airflow_local_settings etc. 

663 config_path = os.path.join(AIRFLOW_HOME, "config") 

664 if config_path not in sys.path: 

665 sys.path.append(config_path) 

666 

667 if PLUGINS_FOLDER not in sys.path: 

668 sys.path.append(PLUGINS_FOLDER) 

669 

670 

671def __getattr__(name: str): 

672 """Handle deprecated module attributes.""" 

673 if name == "MASK_SECRETS_IN_LOGS": 

674 import warnings 

675 

676 warnings.warn( 

677 "settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns default value of False. " 

678 "Use SecretsMasker.enable_log_masking(), disable_log_masking(), or is_log_masking_enabled() instead.", 

679 DeprecationWarning, 

680 stacklevel=2, 

681 ) 

682 return False 

683 raise AttributeError(f"module '{__name__}' has no attribute '{name}'") 

684 

685 

686def import_local_settings(): 

687 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file.""" 

688 try: 

689 import airflow_local_settings 

690 except ModuleNotFoundError as e: 

691 if e.name == "airflow_local_settings": 

692 log.debug("No airflow_local_settings to import.", exc_info=True) 

693 else: 

694 log.critical( 

695 "Failed to import airflow_local_settings due to a transitive module not found error.", 

696 exc_info=True, 

697 ) 

698 raise 

699 except ImportError: 

700 log.critical("Failed to import airflow_local_settings.", exc_info=True) 

701 raise 

702 else: 

703 if hasattr(airflow_local_settings, "__all__"): 

704 names = set(airflow_local_settings.__all__) 

705 else: 

706 names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")} 

707 

708 plugin_functions = policies.make_plugin_from_local_settings( 

709 get_policy_plugin_manager(), airflow_local_settings, names 

710 ) 

711 

712 # If we have already handled a function by adding it to the plugin, 

713 # then don't clobber the global function 

714 for name in names - plugin_functions: 

715 globals()[name] = getattr(airflow_local_settings, name) 

716 

717 if get_policy_plugin_manager().hook.task_instance_mutation_hook.get_hookimpls(): 

718 task_instance_mutation_hook.is_noop = False 

719 

720 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__) 

721 

722 

723def initialize(): 

724 """Initialize Airflow with all the settings from this file.""" 

725 configure_vars() 

726 prepare_syspath_for_config_and_plugins() 

727 policy_mgr = get_policy_plugin_manager() 

728 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything 

729 # in airflow_local_settings to take precendec 

730 load_policy_plugins(policy_mgr) 

731 import_local_settings() 

732 configure_logging() 

733 

734 configure_adapters() 

735 # The webservers import this file from models.py with the default settings. 

736 

737 # Configure secrets masker before masking secrets 

738 _configure_secrets_masker() 

739 

740 is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" 

741 if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None) and not is_worker: 

742 configure_orm() 

743 

744 # mask the sensitive_config_values 

745 conf.mask_secrets() 

746 configure_action_logging() 

747 

748 # Run any custom runtime checks that needs to be executed for providers 

749 run_providers_custom_runtime_checks() 

750 

751 # Ensure we close DB connections at scheduler and gunicorn worker terminations 

752 atexit.register(dispose_orm) 

753 

754 

755# Const stuff 

756 

757KILOBYTE = 1024 

758MEGABYTE = KILOBYTE * KILOBYTE 

759WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"} 

760 

761# Updating serialized DAG can not be faster than a minimum interval to reduce database 

762# write rate. 

763MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30) 

764 

765# If set to True, serialized DAGs is compressed before writing to DB, 

766COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False) 

767 

768# Fetching serialized DAG can not be faster than a minimum interval to reduce database 

769# read rate. This config controls when your DAGs are updated in the Webserver 

770MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10) 

771 

772CAN_FORK = hasattr(os, "fork") 

773 

774EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean( 

775 "core", 

776 "execute_tasks_new_python_interpreter", 

777 fallback=False, 

778) 

779 

780USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True) 

781 

782# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False, 

783# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module. 

784LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True) 

785 

786# By default Airflow providers are lazily-discovered (discovery and imports happen only when required). 

787# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or 

788# loaded from module. 

789LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True) 

790 

791# Executors can set this to true to configure logging correctly for 

792# containerized executors. 

793IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", "")) 

794IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", "")) 

795"""Will be True if running in kubernetes executor pod.""" 

796 

797HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields") 

798 

799# Prefix used to identify tables holding data moved during migration. 

800AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved" 

801 

802DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")