Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/settings.py: 65%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import atexit
21import json as json_lib
22import logging
23import os
24import sys
25import warnings
26from collections.abc import Callable
27from functools import cache, partial
28from importlib import metadata
29from typing import TYPE_CHECKING, Any, Literal
31import pluggy
32from packaging.version import Version
33from sqlalchemy import create_engine
34from sqlalchemy.ext.asyncio import (
35 AsyncEngine,
36 AsyncSession as SAAsyncSession,
37 create_async_engine,
38)
39from sqlalchemy.orm import scoped_session, sessionmaker
41try:
42 from sqlalchemy.ext.asyncio import async_sessionmaker
43except ImportError:
44 async_sessionmaker = sessionmaker # type: ignore[assignment,misc]
46from sqlalchemy.pool import NullPool
48from airflow import __version__ as airflow_version, policies
49from airflow._shared.timezones.timezone import (
50 initialize as initialize_timezone,
51 local_timezone,
52 parse_timezone,
53 utc,
54)
55from airflow.configuration import AIRFLOW_HOME, conf
56from airflow.exceptions import AirflowInternalRuntimeError
57from airflow.logging_config import configure_logging
58from airflow.utils.orm_event_handlers import setup_event_handlers
59from airflow.utils.sqlalchemy import is_sqlalchemy_v1
61_USE_PSYCOPG3: bool
62try:
63 from importlib.util import find_spec
65 is_psycopg3 = find_spec("psycopg") is not None
67 _USE_PSYCOPG3 = is_psycopg3 and not is_sqlalchemy_v1()
68except (ImportError, ModuleNotFoundError):
69 _USE_PSYCOPG3 = False
71if TYPE_CHECKING:
72 from sqlalchemy.engine import Engine
74 from airflow.api_fastapi.common.types import UIAlert
76log = logging.getLogger(__name__)
78try:
79 tz_str = conf.get_mandatory_value("core", "default_timezone")
80 initialize_timezone(tz_str)
81 if tz_str != "system":
82 TIMEZONE = parse_timezone(tz_str)
83 else:
84 TIMEZONE = local_timezone()
85except Exception:
86 TIMEZONE = utc
87 initialize_timezone("UTC")
89log.info("Configured default timezone %s", TIMEZONE)
91if conf.has_option("database", "sql_alchemy_session_maker"):
92 log.info(
93 '[Warning] Found config "sql_alchemy_session_maker", make sure you know what you are doing.\n'
94 "[Warning] Improper configuration of sql_alchemy_session_maker can lead to serious issues, "
95 "including data corruption, unrecoverable application crashes.\n"
96 "[Warning] Please review the SQLAlchemy documentation for detailed guidance on "
97 "proper configuration and best practices."
98 )
100HEADER = "\n".join(
101 [
102 r" ____________ _____________",
103 r" ____ |__( )_________ __/__ /________ __",
104 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /",
105 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /",
106 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/",
107 ]
108)
110SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
112SQL_ALCHEMY_CONN: str | None = None
113SQL_ALCHEMY_CONN_ASYNC: str | None = None
114PLUGINS_FOLDER: str | None = None
115DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))
117engine: Engine | None = None
118Session: scoped_session | None = None
119# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without
120# additional precautions. The only use case is when the session lifecycle needs
121# custom handling. Most of the time we only want one unique thread local session object,
122# this is achieved by the Session factory above.
123NonScopedSession: sessionmaker | None = None
124async_engine: AsyncEngine | None = None
125AsyncSession: Callable[..., SAAsyncSession] | None = None
128def get_engine():
129 """Get the configured engine, raising an error if not configured."""
130 if engine is None:
131 raise RuntimeError("Engine not configured. Call configure_orm() first.")
132 return engine
135def get_session():
136 """Get the configured Session, raising an error if not configured."""
137 if Session is None:
138 raise RuntimeError("Session not configured. Call configure_orm() first.")
139 return Session
142# The JSON library to use for DAG Serialization and De-Serialization
143json = json_lib
145# Display alerts on the dashboard
146# Useful for warning about setup issues or announcing changes to end users
147# List of UIAlerts, which allows for specifying the content and category
148# message to be shown. For example:
149# from airflow.api_fastapi.common.types import UIAlert
150#
151# DASHBOARD_UIALERTS = [
152# UIAlert(text="Welcome to Airflow", category="info"),
153# UIAlert(text="Upgrade tomorrow [help](https://www.example.com)", category="warning"), #With markdown support
154# ]
155#
156DASHBOARD_UIALERTS: list[UIAlert] = []
159@cache
160def _get_rich_console(file):
161 # Delay imports until we need it
162 import rich.console
164 return rich.console.Console(file=file)
167def custom_show_warning(message, category, filename, lineno, file=None, line=None):
168 """Print rich and visible warnings."""
169 # Delay imports until we need it
170 import re
172 from rich.markup import escape
174 re_escape_regex = re.compile(r"(\\*)(\[[a-z#/@][^[]*?])").sub
175 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}"
176 msg += f" {category.__name__}[/bold]: {escape(str(message), _escape=re_escape_regex)}[/yellow]"
177 write_console = _get_rich_console(file or sys.stderr)
178 write_console.print(msg, soft_wrap=True)
181def replace_showwarning(replacement):
182 """
183 Replace ``warnings.showwarning``, returning the original.
185 This is useful since we want to "reset" the ``showwarning`` hook on exit to
186 avoid lazy-loading issues. If a warning is emitted after Python cleaned up
187 the import system, we would no longer be able to import ``rich``.
188 """
189 original = warnings.showwarning
190 warnings.showwarning = replacement
191 return original
194original_show_warning = replace_showwarning(custom_show_warning)
195atexit.register(partial(replace_showwarning, original_show_warning))
198def task_policy(task):
199 return get_policy_plugin_manager().hook.task_policy(task=task)
202def dag_policy(dag):
203 return get_policy_plugin_manager().hook.dag_policy(dag=dag)
206def task_instance_mutation_hook(task_instance):
207 return get_policy_plugin_manager().hook.task_instance_mutation_hook(task_instance=task_instance)
210task_instance_mutation_hook.is_noop = True # type: ignore
213def pod_mutation_hook(pod):
214 return get_policy_plugin_manager().hook.pod_mutation_hook(pod=pod)
217def get_airflow_context_vars(context):
218 return get_policy_plugin_manager().hook.get_airflow_context_vars(context=context)
221def get_dagbag_import_timeout(dag_file_path: str):
222 return get_policy_plugin_manager().hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)
225@cache
226def get_policy_plugin_manager() -> pluggy.PluginManager:
227 plugin_mgr = pluggy.PluginManager(policies.local_settings_hookspec.project_name)
228 plugin_mgr.add_hookspecs(policies)
229 plugin_mgr.register(policies.DefaultPolicy)
230 return plugin_mgr
233def load_policy_plugins(pm: pluggy.PluginManager):
234 # We can't log duration etc here, as logging hasn't yet been configured!
235 pm.load_setuptools_entrypoints("airflow.policy")
238def _get_async_conn_uri_from_sync(sync_uri):
239 AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
240 """Mapping of sync scheme to async scheme."""
242 scheme, rest = sync_uri.split(":", maxsplit=1)
243 scheme = scheme.split("+", maxsplit=1)[0]
244 aiolib = AIO_LIBS_MAPPING.get(scheme)
245 if aiolib:
246 return f"{scheme}+{aiolib}:{rest}"
247 return sync_uri
250def configure_vars():
251 """Configure Global Variables from airflow.cfg."""
252 global SQL_ALCHEMY_CONN
253 global SQL_ALCHEMY_CONN_ASYNC
254 global DAGS_FOLDER
255 global PLUGINS_FOLDER
257 SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn")
258 if conf.has_option("database", "sql_alchemy_conn_async"):
259 SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async")
260 else:
261 SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)
263 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
265 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins"))
268def _run_openlineage_runtime_check():
269 """
270 Ensure compatibility of OpenLineage provider package and Airflow version.
272 Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage
273 provider incompatible with future Airflow versions (>= 2.10.0).
274 """
275 ol_package = "apache-airflow-providers-openlineage"
276 try:
277 ol_version = metadata.version(ol_package)
278 except metadata.PackageNotFoundError:
279 return
281 if ol_version and Version(ol_version) < Version("1.8.0.dev0"):
282 raise RuntimeError(
283 f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with "
284 f"`apache-airflow` == `{airflow_version}`. "
285 f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`."
286 )
289def run_providers_custom_runtime_checks():
290 _run_openlineage_runtime_check()
293class SkipDBTestsSession:
294 """
295 This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set.
297 :meta private:
298 """
300 def __init__(self):
301 raise AirflowInternalRuntimeError(
302 "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n"
303 "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n"
304 "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#"
305 "best-practices-for-db-tests on how "
306 "to deal with it and consult examples."
307 )
309 def remove(*args, **kwargs):
310 pass
312 def get_bind(
313 self,
314 mapper=None,
315 clause=None,
316 bind=None,
317 _sa_skip_events=None,
318 _sa_skip_for_implicit_returning=False,
319 ):
320 pass
323def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
324 """Determine whether the database connection URI specifies a relative path."""
325 # Check for non-empty connection string:
326 if not sqla_conn_str:
327 return False
328 # Check for the right URI scheme:
329 if not sqla_conn_str.startswith("sqlite"):
330 return False
331 # In-memory is not useful for production, but useful for writing tests against Airflow for extensions
332 if sqla_conn_str == "sqlite://":
333 return False
334 # Check for absolute path:
335 if sqla_conn_str.startswith(abs_prefix := "sqlite:///") and os.path.isabs(
336 sqla_conn_str[len(abs_prefix) :]
337 ):
338 return False
339 return True
342def _get_connect_args(mode: Literal["sync", "async"]) -> Any:
343 key = {
344 "sync": "sql_alchemy_connect_args",
345 "async": "sql_alchemy_connect_args_async",
346 }[mode]
347 if conf.has_option("database", key):
348 return conf.getimport("database", key)
349 return {}
352def _configure_async_session() -> None:
353 """
354 Configure async SQLAlchemy session.
356 This exists so tests can reconfigure the session. How SQLAlchemy configures
357 this does not work well with Pytest and you can end up with issues when the
358 session and runs in a different event loop from the test itself.
359 """
360 global AsyncSession, async_engine
362 if not SQL_ALCHEMY_CONN_ASYNC:
363 async_engine = None
364 AsyncSession = None
365 return
367 async_engine = create_async_engine(
368 SQL_ALCHEMY_CONN_ASYNC,
369 connect_args=_get_connect_args("async"),
370 future=True,
371 )
372 AsyncSession = async_sessionmaker(
373 bind=async_engine,
374 class_=SAAsyncSession,
375 autoflush=False,
376 expire_on_commit=False,
377 )
380def configure_orm(disable_connection_pool=False, pool_class=None):
381 """Configure ORM using SQLAlchemy."""
382 from airflow._shared.secrets_masker import mask_secret
384 if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
385 from airflow.exceptions import AirflowConfigException
387 raise AirflowConfigException(
388 f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
389 "Please use absolute path such as `sqlite:////tmp/airflow.db`."
390 )
392 global NonScopedSession
393 global Session
394 global engine
396 if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
397 # Skip DB initialization in unit tests, if DB tests are skipped
398 Session = SkipDBTestsSession
399 engine = None
400 return
401 log.debug("Setting up DB connection pool (PID %s)", os.getpid())
402 engine_args = prepare_engine_args(disable_connection_pool, pool_class)
404 connect_args = _get_connect_args("sync")
405 if SQL_ALCHEMY_CONN.startswith("sqlite"):
406 # FastAPI runs sync endpoints in a separate thread. SQLite does not allow
407 # to use objects created in another threads by default. Allowing that in test
408 # to so the `test` thread and the tested endpoints can use common objects.
409 connect_args["check_same_thread"] = False
411 engine = create_engine(
412 SQL_ALCHEMY_CONN,
413 connect_args=connect_args,
414 **engine_args,
415 future=True,
416 )
417 _configure_async_session()
418 mask_secret(engine.url.password)
419 setup_event_handlers(engine)
421 if conf.has_option("database", "sql_alchemy_session_maker"):
422 _session_maker = conf.getimport("database", "sql_alchemy_session_maker")
423 else:
424 _session_maker = partial(
425 sessionmaker,
426 autocommit=False,
427 autoflush=False,
428 expire_on_commit=False,
429 )
430 if engine is None:
431 raise RuntimeError("Engine must be initialized before creating a session")
432 NonScopedSession = _session_maker(engine)
433 Session = scoped_session(NonScopedSession)
435 if register_at_fork := getattr(os, "register_at_fork", None):
436 # https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
437 def clean_in_fork():
438 _globals = globals()
439 if engine := _globals.get("engine"):
440 engine.dispose(close=False)
441 if async_engine := _globals.get("async_engine"):
442 async_engine.sync_engine.dispose(close=False)
444 # Won't work on Windows
445 register_at_fork(after_in_child=clean_in_fork)
448def prepare_engine_args(disable_connection_pool=False, pool_class=None):
449 """Prepare SQLAlchemy engine args."""
450 DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = {
451 "postgresql": (
452 {
453 "executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
454 }
455 | (
456 {}
457 if _USE_PSYCOPG3
458 else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000}
459 )
460 )
461 }
463 default_args = {}
464 for dialect, default in DEFAULT_ENGINE_ARGS.items():
465 if SQL_ALCHEMY_CONN.startswith(dialect):
466 default_args = default.copy()
467 break
469 engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args)
471 if pool_class:
472 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args
473 engine_args["poolclass"] = pool_class
474 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
475 engine_args["poolclass"] = NullPool
476 log.debug("settings.prepare_engine_args(): Using NullPool")
477 elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
478 # Pool size engine args not supported by sqlite.
479 # If no config value is defined for the pool size, select a reasonable value.
480 # 0 means no limit, which could lead to exceeding the Database connection limit.
481 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)
483 # The maximum overflow size of the pool.
484 # When the number of checked-out connections reaches the size set in pool_size,
485 # additional connections will be returned up to this limit.
486 # When those additional connections are returned to the pool, they are disconnected and discarded.
487 # It follows then that the total number of simultaneous connections
488 # the pool will allow is pool_size + max_overflow,
489 # and the total number of "sleeping" connections the pool will allow is pool_size.
490 # max_overflow can be set to -1 to indicate no overflow limit;
491 # no limit will be placed on the total number
492 # of concurrent connections. Defaults to 10.
493 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
495 # The DB server already has a value for wait_timeout (number of seconds after
496 # which an idle sleeping connection should be killed). Since other DBs may
497 # co-exist on the same server, SQLAlchemy should set its
498 # pool_recycle to an equal or smaller value.
499 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
501 # Check connection at the start of each connection pool checkout.
502 # Typically, this is a simple statement like "SELECT 1", but may also make use
503 # of some DBAPI-specific method to test the connection for liveness.
504 # More information here:
505 # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic
506 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
508 log.debug(
509 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
510 "pool_recycle=%d, pid=%d",
511 pool_size,
512 max_overflow,
513 pool_recycle,
514 os.getpid(),
515 )
516 engine_args["pool_size"] = pool_size
517 engine_args["pool_recycle"] = pool_recycle
518 engine_args["pool_pre_ping"] = pool_pre_ping
519 engine_args["max_overflow"] = max_overflow
521 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when
522 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
523 # 'READ COMMITTED' is the default value for PostgreSQL.
524 # More information here:
525 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
526 if SQL_ALCHEMY_CONN.startswith("mysql"):
527 engine_args["isolation_level"] = "READ COMMITTED"
529 if is_sqlalchemy_v1():
530 # Allow the user to specify an encoding for their DB otherwise default
531 # to utf-8 so jobs & users with non-latin1 characters can still use us.
532 # This parameter was removed in SQLAlchemy 2.x.
533 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8")
535 return engine_args
538def dispose_orm(do_log: bool = True):
539 """Properly close pooled database connections."""
540 global Session, engine, NonScopedSession
542 _globals = globals()
543 if _globals.get("engine") is None and _globals.get("Session") is None:
544 return
546 if do_log:
547 log.debug("Disposing DB connection pool (PID %s)", os.getpid())
549 if "Session" in _globals and Session is not None:
550 from sqlalchemy.orm.session import close_all_sessions
552 Session.remove()
553 Session = None
554 NonScopedSession = None
555 close_all_sessions()
557 if "engine" in _globals and engine is not None:
558 engine.dispose()
559 engine = None
562def reconfigure_orm(disable_connection_pool=False, pool_class=None):
563 """Properly close database connections and re-configure ORM."""
564 dispose_orm()
565 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)
568def configure_adapters():
569 """Register Adapters and DB Converters."""
570 from pendulum import DateTime as Pendulum
572 if SQL_ALCHEMY_CONN.startswith("sqlite"):
573 from sqlite3 import register_adapter
575 register_adapter(Pendulum, lambda val: val.isoformat(" "))
577 if SQL_ALCHEMY_CONN.startswith("mysql"):
578 try:
579 try:
580 import MySQLdb.converters
581 except ImportError:
582 raise RuntimeError(
583 "You do not have `mysqlclient` package installed. "
584 "Please install it with `pip install mysqlclient` and make sure you have system "
585 "mysql libraries installed, as well as well as `pkg-config` system package "
586 "installed in case you see compilation error during installation."
587 )
589 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
590 except ImportError:
591 pass
592 try:
593 import pymysql.converters
595 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime
596 except ImportError:
597 pass
600def _configure_secrets_masker():
601 """Configure the secrets masker with values from config."""
602 from airflow._shared.secrets_masker import (
603 DEFAULT_SENSITIVE_FIELDS,
604 _secrets_masker as secrets_masker_core,
605 )
606 from airflow.configuration import conf
608 min_length_to_mask = conf.getint("logging", "min_length_masked_secret", fallback=5)
609 secret_mask_adapter = conf.getimport("logging", "secret_mask_adapter", fallback=None)
610 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
611 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names")
612 if sensitive_variable_fields:
613 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")})
615 core_masker = secrets_masker_core()
616 core_masker.min_length_to_mask = min_length_to_mask
617 core_masker.sensitive_variables_fields = list(sensitive_fields)
618 core_masker.secret_mask_adapter = secret_mask_adapter
620 from airflow.sdk._shared.secrets_masker import _secrets_masker as sdk_secrets_masker
622 sdk_masker = sdk_secrets_masker()
623 sdk_masker.min_length_to_mask = min_length_to_mask
624 sdk_masker.sensitive_variables_fields = list(sensitive_fields)
625 sdk_masker.secret_mask_adapter = secret_mask_adapter
628def configure_action_logging() -> None:
629 """Any additional configuration (register callback) for airflow.utils.action_loggers module."""
632def prepare_syspath_for_config_and_plugins():
633 """Update sys.path for the config and plugins directories."""
634 # Add ./config/ for loading custom log parsers etc, or
635 # airflow_local_settings etc.
636 config_path = os.path.join(AIRFLOW_HOME, "config")
637 if config_path not in sys.path:
638 sys.path.append(config_path)
640 if PLUGINS_FOLDER not in sys.path:
641 sys.path.append(PLUGINS_FOLDER)
644def __getattr__(name: str):
645 """Handle deprecated module attributes."""
646 import warnings
648 from airflow.exceptions import RemovedInAirflow4Warning
650 if name == "MASK_SECRETS_IN_LOGS":
651 warnings.warn(
652 "settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns default value of False. "
653 "Use SecretsMasker.enable_log_masking(), disable_log_masking(), or is_log_masking_enabled() instead.",
654 RemovedInAirflow4Warning,
655 stacklevel=2,
656 )
657 return False
658 if name == "WEB_COLORS":
659 warnings.warn(
660 "settings.WEB_COLORS has been removed. This shim returns default value. "
661 "Please upgrade your provider or integration.",
662 RemovedInAirflow4Warning,
663 stacklevel=2,
664 )
665 return {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
666 if name == "EXECUTE_TASKS_NEW_PYTHON_INTERPRETER":
667 warnings.warn(
668 "settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER has been removed. This shim returns default value. "
669 "Please upgrade your provider or integration.",
670 RemovedInAirflow4Warning,
671 stacklevel=2,
672 )
673 return not hasattr(os, "fork") or conf.getboolean(
674 "core",
675 "execute_tasks_new_python_interpreter",
676 fallback=False,
677 )
679 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
682def import_local_settings():
683 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
684 try:
685 import airflow_local_settings
686 except ModuleNotFoundError as e:
687 if e.name == "airflow_local_settings":
688 log.debug("No airflow_local_settings to import.", exc_info=True)
689 else:
690 log.critical(
691 "Failed to import airflow_local_settings due to a transitive module not found error.",
692 exc_info=True,
693 )
694 raise
695 except ImportError:
696 log.critical("Failed to import airflow_local_settings.", exc_info=True)
697 raise
698 else:
699 if hasattr(airflow_local_settings, "__all__"):
700 names = set(airflow_local_settings.__all__)
701 else:
702 names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")}
704 plugin_functions = policies.make_plugin_from_local_settings(
705 get_policy_plugin_manager(), airflow_local_settings, names
706 )
708 # If we have already handled a function by adding it to the plugin,
709 # then don't clobber the global function
710 for name in names - plugin_functions:
711 globals()[name] = getattr(airflow_local_settings, name)
713 if get_policy_plugin_manager().hook.task_instance_mutation_hook.get_hookimpls():
714 task_instance_mutation_hook.is_noop = False
716 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)
719def initialize():
720 """Initialize Airflow with all the settings from this file."""
721 configure_vars()
722 prepare_syspath_for_config_and_plugins()
723 policy_mgr = get_policy_plugin_manager()
724 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything
725 # in airflow_local_settings to take precendec
726 load_policy_plugins(policy_mgr)
727 import_local_settings()
728 configure_logging()
730 configure_adapters()
731 # The webservers import this file from models.py with the default settings.
733 # Configure secrets masker before masking secrets
734 _configure_secrets_masker()
736 is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1"
737 if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None) and not is_worker:
738 configure_orm()
740 # mask the sensitive_config_values
741 conf.mask_secrets()
742 configure_action_logging()
744 # Run any custom runtime checks that needs to be executed for providers
745 run_providers_custom_runtime_checks()
747 # Ensure we close DB connections at scheduler and gunicorn worker terminations
748 atexit.register(dispose_orm)
751# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
752# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
753LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True)
755# By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
756# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
757# loaded from module.
758LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)
760DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")