Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/settings.py: 69%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import atexit
21import json as json_lib
22import logging
23import os
24import sys
25import warnings
26from collections.abc import Callable
27from functools import cache, partial
28from importlib import metadata
29from typing import TYPE_CHECKING, Any, Literal
31import pluggy
32from packaging.version import Version
33from sqlalchemy import create_engine
34from sqlalchemy.ext.asyncio import (
35 AsyncEngine,
36 AsyncSession as SAAsyncSession,
37 create_async_engine,
38)
39from sqlalchemy.orm import scoped_session, sessionmaker
41try:
42 from sqlalchemy.ext.asyncio import async_sessionmaker
43except ImportError:
44 async_sessionmaker = sessionmaker # type: ignore[assignment,misc]
46from sqlalchemy.pool import NullPool
48from airflow import __version__ as airflow_version, policies
49from airflow._shared.timezones.timezone import (
50 initialize as initialize_timezone,
51 local_timezone,
52 parse_timezone,
53 utc,
54)
55from airflow.configuration import AIRFLOW_HOME, conf
56from airflow.exceptions import AirflowInternalRuntimeError
57from airflow.logging_config import configure_logging
58from airflow.utils.orm_event_handlers import setup_event_handlers
59from airflow.utils.sqlalchemy import is_sqlalchemy_v1
61USE_PSYCOPG3: bool
62try:
63 from importlib.util import find_spec
65 is_psycopg3 = find_spec("psycopg") is not None
67 USE_PSYCOPG3 = is_psycopg3 and not is_sqlalchemy_v1()
68except (ImportError, ModuleNotFoundError):
69 USE_PSYCOPG3 = False
71if TYPE_CHECKING:
72 from sqlalchemy.engine import Engine
74 from airflow.api_fastapi.common.types import UIAlert
76log = logging.getLogger(__name__)
78try:
79 tz_str = conf.get_mandatory_value("core", "default_timezone")
80 initialize_timezone(tz_str)
81 if tz_str != "system":
82 TIMEZONE = parse_timezone(tz_str)
83 else:
84 TIMEZONE = local_timezone()
85except Exception:
86 TIMEZONE = utc
87 initialize_timezone("UTC")
89log.info("Configured default timezone %s", TIMEZONE)
91if conf.has_option("database", "sql_alchemy_session_maker"):
92 log.info(
93 '[Warning] Found config "sql_alchemy_session_maker", make sure you know what you are doing.\n'
94 "[Warning] Improper configuration of sql_alchemy_session_maker can lead to serious issues, "
95 "including data corruption, unrecoverable application crashes.\n"
96 "[Warning] Please review the SQLAlchemy documentation for detailed guidance on "
97 "proper configuration and best practices."
98 )
100HEADER = "\n".join(
101 [
102 r" ____________ _____________",
103 r" ____ |__( )_________ __/__ /________ __",
104 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /",
105 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /",
106 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/",
107 ]
108)
110LOGGING_LEVEL = logging.INFO
112# the prefix to append to gunicorn worker processes after init
113GUNICORN_WORKER_READY_PREFIX = "[ready] "
115LOG_FORMAT = conf.get("logging", "log_format")
116SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
118SQL_ALCHEMY_CONN: str | None = None
119SQL_ALCHEMY_CONN_ASYNC: str | None = None
120PLUGINS_FOLDER: str | None = None
121DONOT_MODIFY_HANDLERS: bool | None = None
122DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))
124AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
125"""
126Mapping of sync scheme to async scheme.
128:meta private:
129"""
131engine: Engine | None = None
132Session: scoped_session | None = None
133# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without
134# additional precautions. The only use case is when the session lifecycle needs
135# custom handling. Most of the time we only want one unique thread local session object,
136# this is achieved by the Session factory above.
137NonScopedSession: sessionmaker | None = None
138async_engine: AsyncEngine | None = None
139AsyncSession: Callable[..., SAAsyncSession] | None = None
142def get_engine():
143 """Get the configured engine, raising an error if not configured."""
144 if engine is None:
145 raise RuntimeError("Engine not configured. Call configure_orm() first.")
146 return engine
149def get_session():
150 """Get the configured Session, raising an error if not configured."""
151 if Session is None:
152 raise RuntimeError("Session not configured. Call configure_orm() first.")
153 return Session
156# The JSON library to use for DAG Serialization and De-Serialization
157json = json_lib
159# Display alerts on the dashboard
160# Useful for warning about setup issues or announcing changes to end users
161# List of UIAlerts, which allows for specifying the content and category
162# message to be shown. For example:
163# from airflow.api_fastapi.common.types import UIAlert
164#
165# DASHBOARD_UIALERTS = [
166# UIAlert(text="Welcome to Airflow", category="info"),
167# UIAlert(text="Upgrade tomorrow [help](https://www.example.com)", category="warning"), #With markdown support
168# ]
169#
170DASHBOARD_UIALERTS: list[UIAlert] = []
173@cache
174def _get_rich_console(file):
175 # Delay imports until we need it
176 import rich.console
178 return rich.console.Console(file=file)
181def custom_show_warning(message, category, filename, lineno, file=None, line=None):
182 """Print rich and visible warnings."""
183 # Delay imports until we need it
184 import re
186 from rich.markup import escape
188 re_escape_regex = re.compile(r"(\\*)(\[[a-z#/@][^[]*?])").sub
189 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}"
190 msg += f" {category.__name__}[/bold]: {escape(str(message), _escape=re_escape_regex)}[/yellow]"
191 write_console = _get_rich_console(file or sys.stderr)
192 write_console.print(msg, soft_wrap=True)
195def replace_showwarning(replacement):
196 """
197 Replace ``warnings.showwarning``, returning the original.
199 This is useful since we want to "reset" the ``showwarning`` hook on exit to
200 avoid lazy-loading issues. If a warning is emitted after Python cleaned up
201 the import system, we would no longer be able to import ``rich``.
202 """
203 original = warnings.showwarning
204 warnings.showwarning = replacement
205 return original
208original_show_warning = replace_showwarning(custom_show_warning)
209atexit.register(partial(replace_showwarning, original_show_warning))
212def task_policy(task):
213 return get_policy_plugin_manager().hook.task_policy(task=task)
216def dag_policy(dag):
217 return get_policy_plugin_manager().hook.dag_policy(dag=dag)
220def task_instance_mutation_hook(task_instance):
221 return get_policy_plugin_manager().hook.task_instance_mutation_hook(task_instance=task_instance)
224task_instance_mutation_hook.is_noop = True # type: ignore
227def pod_mutation_hook(pod):
228 return get_policy_plugin_manager().hook.pod_mutation_hook(pod=pod)
231def get_airflow_context_vars(context):
232 return get_policy_plugin_manager().hook.get_airflow_context_vars(context=context)
235def get_dagbag_import_timeout(dag_file_path: str):
236 return get_policy_plugin_manager().hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)
239@cache
240def get_policy_plugin_manager() -> pluggy.PluginManager:
241 plugin_mgr = pluggy.PluginManager(policies.local_settings_hookspec.project_name)
242 plugin_mgr.add_hookspecs(policies)
243 plugin_mgr.register(policies.DefaultPolicy)
244 return plugin_mgr
247def load_policy_plugins(pm: pluggy.PluginManager):
248 # We can't log duration etc here, as logging hasn't yet been configured!
249 pm.load_setuptools_entrypoints("airflow.policy")
252def _get_async_conn_uri_from_sync(sync_uri):
253 scheme, rest = sync_uri.split(":", maxsplit=1)
254 scheme = scheme.split("+", maxsplit=1)[0]
255 aiolib = AIO_LIBS_MAPPING.get(scheme)
256 if aiolib:
257 return f"{scheme}+{aiolib}:{rest}"
258 return sync_uri
261def configure_vars():
262 """Configure Global Variables from airflow.cfg."""
263 global SQL_ALCHEMY_CONN
264 global SQL_ALCHEMY_CONN_ASYNC
265 global DAGS_FOLDER
266 global PLUGINS_FOLDER
267 global DONOT_MODIFY_HANDLERS
269 SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn")
270 if conf.has_option("database", "sql_alchemy_conn_async"):
271 SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async")
272 else:
273 SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)
275 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
277 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins"))
279 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command
280 # If the flag is set to False, we remove all handlers from the root logger
281 # and add all handlers from 'airflow.task' logger to the root Logger. This is done
282 # to get all the logs from the print & log statements in the DAG files before a task is run
283 # The handlers are restored after the task completes execution.
284 DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False)
287def _run_openlineage_runtime_check():
288 """
289 Ensure compatibility of OpenLineage provider package and Airflow version.
291 Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage
292 provider incompatible with future Airflow versions (>= 2.10.0).
293 """
294 ol_package = "apache-airflow-providers-openlineage"
295 try:
296 ol_version = metadata.version(ol_package)
297 except metadata.PackageNotFoundError:
298 return
300 if ol_version and Version(ol_version) < Version("1.8.0.dev0"):
301 raise RuntimeError(
302 f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with "
303 f"`apache-airflow` == `{airflow_version}`. "
304 f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`."
305 )
308def run_providers_custom_runtime_checks():
309 _run_openlineage_runtime_check()
312class SkipDBTestsSession:
313 """
314 This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set.
316 :meta private:
317 """
319 def __init__(self):
320 raise AirflowInternalRuntimeError(
321 "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n"
322 "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n"
323 "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#"
324 "best-practices-for-db-tests on how "
325 "to deal with it and consult examples."
326 )
328 def remove(*args, **kwargs):
329 pass
331 def get_bind(
332 self,
333 mapper=None,
334 clause=None,
335 bind=None,
336 _sa_skip_events=None,
337 _sa_skip_for_implicit_returning=False,
338 ):
339 pass
342AIRFLOW_PATH = os.path.dirname(os.path.dirname(__file__))
343AIRFLOW_TESTS_PATH = os.path.join(AIRFLOW_PATH, "tests")
344AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py")
345AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "session.py")
346AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "baseoperator.py")
349def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
350 """Determine whether the database connection URI specifies a relative path."""
351 # Check for non-empty connection string:
352 if not sqla_conn_str:
353 return False
354 # Check for the right URI scheme:
355 if not sqla_conn_str.startswith("sqlite"):
356 return False
357 # In-memory is not useful for production, but useful for writing tests against Airflow for extensions
358 if sqla_conn_str == "sqlite://":
359 return False
360 # Check for absolute path:
361 if sqla_conn_str.startswith(abs_prefix := "sqlite:///") and os.path.isabs(
362 sqla_conn_str[len(abs_prefix) :]
363 ):
364 return False
365 return True
368def _get_connect_args(mode: Literal["sync", "async"]) -> Any:
369 key = {
370 "sync": "sql_alchemy_connect_args",
371 "async": "sql_alchemy_connect_args_async",
372 }[mode]
373 if conf.has_option("database", key):
374 return conf.getimport("database", key)
375 return {}
378def _configure_async_session() -> None:
379 """
380 Configure async SQLAlchemy session.
382 This exists so tests can reconfigure the session. How SQLAlchemy configures
383 this does not work well with Pytest and you can end up with issues when the
384 session and runs in a different event loop from the test itself.
385 """
386 global AsyncSession, async_engine
388 if not SQL_ALCHEMY_CONN_ASYNC:
389 async_engine = None
390 AsyncSession = None
391 return
393 async_engine = create_async_engine(
394 SQL_ALCHEMY_CONN_ASYNC,
395 connect_args=_get_connect_args("async"),
396 future=True,
397 )
398 AsyncSession = async_sessionmaker(
399 bind=async_engine,
400 class_=SAAsyncSession,
401 autoflush=False,
402 expire_on_commit=False,
403 )
406def configure_orm(disable_connection_pool=False, pool_class=None):
407 """Configure ORM using SQLAlchemy."""
408 from airflow._shared.secrets_masker import mask_secret
410 if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
411 from airflow.exceptions import AirflowConfigException
413 raise AirflowConfigException(
414 f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
415 "Please use absolute path such as `sqlite:////tmp/airflow.db`."
416 )
418 global NonScopedSession
419 global Session
420 global engine
422 if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
423 # Skip DB initialization in unit tests, if DB tests are skipped
424 Session = SkipDBTestsSession
425 engine = None
426 return
427 log.debug("Setting up DB connection pool (PID %s)", os.getpid())
428 engine_args = prepare_engine_args(disable_connection_pool, pool_class)
430 connect_args = _get_connect_args("sync")
431 if SQL_ALCHEMY_CONN.startswith("sqlite"):
432 # FastAPI runs sync endpoints in a separate thread. SQLite does not allow
433 # to use objects created in another threads by default. Allowing that in test
434 # to so the `test` thread and the tested endpoints can use common objects.
435 connect_args["check_same_thread"] = False
437 engine = create_engine(
438 SQL_ALCHEMY_CONN,
439 connect_args=connect_args,
440 **engine_args,
441 future=True,
442 )
443 _configure_async_session()
444 mask_secret(engine.url.password)
445 setup_event_handlers(engine)
447 if conf.has_option("database", "sql_alchemy_session_maker"):
448 _session_maker = conf.getimport("database", "sql_alchemy_session_maker")
449 else:
450 _session_maker = partial(
451 sessionmaker,
452 autocommit=False,
453 autoflush=False,
454 expire_on_commit=False,
455 )
456 if engine is None:
457 raise RuntimeError("Engine must be initialized before creating a session")
458 NonScopedSession = _session_maker(engine)
459 Session = scoped_session(NonScopedSession)
461 if register_at_fork := getattr(os, "register_at_fork", None):
462 # https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
463 def clean_in_fork():
464 _globals = globals()
465 if engine := _globals.get("engine"):
466 engine.dispose(close=False)
467 if async_engine := _globals.get("async_engine"):
468 async_engine.sync_engine.dispose(close=False)
470 # Won't work on Windows
471 register_at_fork(after_in_child=clean_in_fork)
474DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = {
475 "postgresql": (
476 {
477 "executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
478 }
479 | (
480 {}
481 if USE_PSYCOPG3
482 else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000}
483 )
484 )
485}
488def prepare_engine_args(disable_connection_pool=False, pool_class=None):
489 """Prepare SQLAlchemy engine args."""
490 default_args = {}
491 for dialect, default in DEFAULT_ENGINE_ARGS.items():
492 if SQL_ALCHEMY_CONN.startswith(dialect):
493 default_args = default.copy()
494 break
496 engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args)
498 if pool_class:
499 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args
500 engine_args["poolclass"] = pool_class
501 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
502 engine_args["poolclass"] = NullPool
503 log.debug("settings.prepare_engine_args(): Using NullPool")
504 elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
505 # Pool size engine args not supported by sqlite.
506 # If no config value is defined for the pool size, select a reasonable value.
507 # 0 means no limit, which could lead to exceeding the Database connection limit.
508 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)
510 # The maximum overflow size of the pool.
511 # When the number of checked-out connections reaches the size set in pool_size,
512 # additional connections will be returned up to this limit.
513 # When those additional connections are returned to the pool, they are disconnected and discarded.
514 # It follows then that the total number of simultaneous connections
515 # the pool will allow is pool_size + max_overflow,
516 # and the total number of "sleeping" connections the pool will allow is pool_size.
517 # max_overflow can be set to -1 to indicate no overflow limit;
518 # no limit will be placed on the total number
519 # of concurrent connections. Defaults to 10.
520 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
522 # The DB server already has a value for wait_timeout (number of seconds after
523 # which an idle sleeping connection should be killed). Since other DBs may
524 # co-exist on the same server, SQLAlchemy should set its
525 # pool_recycle to an equal or smaller value.
526 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
528 # Check connection at the start of each connection pool checkout.
529 # Typically, this is a simple statement like "SELECT 1", but may also make use
530 # of some DBAPI-specific method to test the connection for liveness.
531 # More information here:
532 # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic
533 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
535 log.debug(
536 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
537 "pool_recycle=%d, pid=%d",
538 pool_size,
539 max_overflow,
540 pool_recycle,
541 os.getpid(),
542 )
543 engine_args["pool_size"] = pool_size
544 engine_args["pool_recycle"] = pool_recycle
545 engine_args["pool_pre_ping"] = pool_pre_ping
546 engine_args["max_overflow"] = max_overflow
548 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when
549 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
550 # 'READ COMMITTED' is the default value for PostgreSQL.
551 # More information here:
552 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
553 if SQL_ALCHEMY_CONN.startswith("mysql"):
554 engine_args["isolation_level"] = "READ COMMITTED"
556 if is_sqlalchemy_v1():
557 # Allow the user to specify an encoding for their DB otherwise default
558 # to utf-8 so jobs & users with non-latin1 characters can still use us.
559 # This parameter was removed in SQLAlchemy 2.x.
560 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8")
562 return engine_args
565def dispose_orm(do_log: bool = True):
566 """Properly close pooled database connections."""
567 global Session, engine, NonScopedSession
569 _globals = globals()
570 if _globals.get("engine") is None and _globals.get("Session") is None:
571 return
573 if do_log:
574 log.debug("Disposing DB connection pool (PID %s)", os.getpid())
576 if "Session" in _globals and Session is not None:
577 from sqlalchemy.orm.session import close_all_sessions
579 Session.remove()
580 Session = None
581 NonScopedSession = None
582 close_all_sessions()
584 if "engine" in _globals and engine is not None:
585 engine.dispose()
586 engine = None
589def reconfigure_orm(disable_connection_pool=False, pool_class=None):
590 """Properly close database connections and re-configure ORM."""
591 dispose_orm()
592 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)
595def configure_adapters():
596 """Register Adapters and DB Converters."""
597 from pendulum import DateTime as Pendulum
599 if SQL_ALCHEMY_CONN.startswith("sqlite"):
600 from sqlite3 import register_adapter
602 register_adapter(Pendulum, lambda val: val.isoformat(" "))
604 if SQL_ALCHEMY_CONN.startswith("mysql"):
605 try:
606 try:
607 import MySQLdb.converters
608 except ImportError:
609 raise RuntimeError(
610 "You do not have `mysqlclient` package installed. "
611 "Please install it with `pip install mysqlclient` and make sure you have system "
612 "mysql libraries installed, as well as well as `pkg-config` system package "
613 "installed in case you see compilation error during installation."
614 )
616 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
617 except ImportError:
618 pass
619 try:
620 import pymysql.converters
622 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime
623 except ImportError:
624 pass
627def _configure_secrets_masker():
628 """Configure the secrets masker with values from config."""
629 from airflow._shared.secrets_masker import (
630 DEFAULT_SENSITIVE_FIELDS,
631 _secrets_masker as secrets_masker_core,
632 )
633 from airflow.configuration import conf
635 min_length_to_mask = conf.getint("logging", "min_length_masked_secret", fallback=5)
636 secret_mask_adapter = conf.getimport("logging", "secret_mask_adapter", fallback=None)
637 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
638 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names")
639 if sensitive_variable_fields:
640 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")})
642 core_masker = secrets_masker_core()
643 core_masker.min_length_to_mask = min_length_to_mask
644 core_masker.sensitive_variables_fields = list(sensitive_fields)
645 core_masker.secret_mask_adapter = secret_mask_adapter
647 from airflow.sdk._shared.secrets_masker import _secrets_masker as sdk_secrets_masker
649 sdk_masker = sdk_secrets_masker()
650 sdk_masker.min_length_to_mask = min_length_to_mask
651 sdk_masker.sensitive_variables_fields = list(sensitive_fields)
652 sdk_masker.secret_mask_adapter = secret_mask_adapter
655def configure_action_logging() -> None:
656 """Any additional configuration (register callback) for airflow.utils.action_loggers module."""
659def prepare_syspath_for_config_and_plugins():
660 """Update sys.path for the config and plugins directories."""
661 # Add ./config/ for loading custom log parsers etc, or
662 # airflow_local_settings etc.
663 config_path = os.path.join(AIRFLOW_HOME, "config")
664 if config_path not in sys.path:
665 sys.path.append(config_path)
667 if PLUGINS_FOLDER not in sys.path:
668 sys.path.append(PLUGINS_FOLDER)
671def __getattr__(name: str):
672 """Handle deprecated module attributes."""
673 if name == "MASK_SECRETS_IN_LOGS":
674 import warnings
676 warnings.warn(
677 "settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns default value of False. "
678 "Use SecretsMasker.enable_log_masking(), disable_log_masking(), or is_log_masking_enabled() instead.",
679 DeprecationWarning,
680 stacklevel=2,
681 )
682 return False
683 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
686def import_local_settings():
687 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
688 try:
689 import airflow_local_settings
690 except ModuleNotFoundError as e:
691 if e.name == "airflow_local_settings":
692 log.debug("No airflow_local_settings to import.", exc_info=True)
693 else:
694 log.critical(
695 "Failed to import airflow_local_settings due to a transitive module not found error.",
696 exc_info=True,
697 )
698 raise
699 except ImportError:
700 log.critical("Failed to import airflow_local_settings.", exc_info=True)
701 raise
702 else:
703 if hasattr(airflow_local_settings, "__all__"):
704 names = set(airflow_local_settings.__all__)
705 else:
706 names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")}
708 plugin_functions = policies.make_plugin_from_local_settings(
709 get_policy_plugin_manager(), airflow_local_settings, names
710 )
712 # If we have already handled a function by adding it to the plugin,
713 # then don't clobber the global function
714 for name in names - plugin_functions:
715 globals()[name] = getattr(airflow_local_settings, name)
717 if get_policy_plugin_manager().hook.task_instance_mutation_hook.get_hookimpls():
718 task_instance_mutation_hook.is_noop = False
720 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)
723def initialize():
724 """Initialize Airflow with all the settings from this file."""
725 configure_vars()
726 prepare_syspath_for_config_and_plugins()
727 policy_mgr = get_policy_plugin_manager()
728 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything
729 # in airflow_local_settings to take precendec
730 load_policy_plugins(policy_mgr)
731 import_local_settings()
732 configure_logging()
734 configure_adapters()
735 # The webservers import this file from models.py with the default settings.
737 # Configure secrets masker before masking secrets
738 _configure_secrets_masker()
740 is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1"
741 if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None) and not is_worker:
742 configure_orm()
744 # mask the sensitive_config_values
745 conf.mask_secrets()
746 configure_action_logging()
748 # Run any custom runtime checks that needs to be executed for providers
749 run_providers_custom_runtime_checks()
751 # Ensure we close DB connections at scheduler and gunicorn worker terminations
752 atexit.register(dispose_orm)
755# Const stuff
757KILOBYTE = 1024
758MEGABYTE = KILOBYTE * KILOBYTE
759WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
761# Updating serialized DAG can not be faster than a minimum interval to reduce database
762# write rate.
763MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30)
765# If set to True, serialized DAGs is compressed before writing to DB,
766COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)
768# Fetching serialized DAG can not be faster than a minimum interval to reduce database
769# read rate. This config controls when your DAGs are updated in the Webserver
770MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10)
772CAN_FORK = hasattr(os, "fork")
774EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
775 "core",
776 "execute_tasks_new_python_interpreter",
777 fallback=False,
778)
780USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)
782# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
783# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
784LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True)
786# By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
787# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
788# loaded from module.
789LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)
791# Executors can set this to true to configure logging correctly for
792# containerized executors.
793IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", ""))
794IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
795"""Will be True if running in kubernetes executor pod."""
797HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")
799# Prefix used to identify tables holding data moved during migration.
800AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
802DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")