Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/settings.py: 65%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import atexit
21import json as json_lib
22import logging
23import os
24import sys
25import warnings
26from collections.abc import Callable
27from functools import cache, partial
28from importlib import metadata
29from typing import TYPE_CHECKING, Any, Literal
31import pluggy
32from packaging.version import Version
33from sqlalchemy import create_engine
34from sqlalchemy.ext.asyncio import (
35 AsyncEngine,
36 AsyncSession as SAAsyncSession,
37 create_async_engine,
38)
39from sqlalchemy.orm import scoped_session, sessionmaker
41try:
42 from sqlalchemy.ext.asyncio import async_sessionmaker
43except ImportError:
44 async_sessionmaker = sessionmaker # type: ignore[assignment,misc]
46from sqlalchemy.pool import NullPool
48from airflow import __version__ as airflow_version, policies
49from airflow._shared.timezones.timezone import (
50 initialize as initialize_timezone,
51 local_timezone,
52 parse_timezone,
53 utc,
54)
55from airflow.configuration import AIRFLOW_HOME, conf
56from airflow.exceptions import AirflowInternalRuntimeError
57from airflow.logging_config import configure_logging
58from airflow.utils.orm_event_handlers import setup_event_handlers
60_USE_PSYCOPG3: bool
61try:
62 from importlib.util import find_spec
64 _USE_PSYCOPG3 = find_spec("psycopg") is not None
66except (ImportError, ModuleNotFoundError):
67 _USE_PSYCOPG3 = False
69if TYPE_CHECKING:
70 from sqlalchemy.engine import Engine
72 from airflow.api_fastapi.common.types import UIAlert
74log = logging.getLogger(__name__)
76try:
77 tz_str = conf.get_mandatory_value("core", "default_timezone")
78 initialize_timezone(tz_str)
79 if tz_str != "system":
80 TIMEZONE = parse_timezone(tz_str)
81 else:
82 TIMEZONE = local_timezone()
83except Exception:
84 TIMEZONE = utc
85 initialize_timezone("UTC")
87log.info("Configured default timezone %s", TIMEZONE)
89if conf.has_option("database", "sql_alchemy_session_maker"):
90 log.info(
91 '[Warning] Found config "sql_alchemy_session_maker", make sure you know what you are doing.\n'
92 "[Warning] Improper configuration of sql_alchemy_session_maker can lead to serious issues, "
93 "including data corruption, unrecoverable application crashes.\n"
94 "[Warning] Please review the SQLAlchemy documentation for detailed guidance on "
95 "proper configuration and best practices."
96 )
98HEADER = "\n".join(
99 [
100 r" ____________ _____________",
101 r" ____ |__( )_________ __/__ /________ __",
102 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /",
103 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /",
104 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/",
105 ]
106)
108SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
110SQL_ALCHEMY_CONN: str | None = None
111SQL_ALCHEMY_CONN_ASYNC: str | None = None
112PLUGINS_FOLDER: str | None = None
113DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))
115engine: Engine | None = None
116Session: scoped_session | None = None
117# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without
118# additional precautions. The only use case is when the session lifecycle needs
119# custom handling. Most of the time we only want one unique thread local session object,
120# this is achieved by the Session factory above.
121NonScopedSession: sessionmaker | None = None
122async_engine: AsyncEngine | None = None
123AsyncSession: Callable[..., SAAsyncSession] | None = None
126def get_engine():
127 """Get the configured engine, raising an error if not configured."""
128 if engine is None:
129 raise RuntimeError("Engine not configured. Call configure_orm() first.")
130 return engine
133def get_session():
134 """Get the configured Session, raising an error if not configured."""
135 if Session is None:
136 raise RuntimeError("Session not configured. Call configure_orm() first.")
137 return Session
140# The JSON library to use for DAG Serialization and De-Serialization
141json = json_lib
143# Display alerts on the dashboard
144# Useful for warning about setup issues or announcing changes to end users
145# List of UIAlerts, which allows for specifying the content and category
146# message to be shown. For example:
147# from airflow.api_fastapi.common.types import UIAlert
148#
149# DASHBOARD_UIALERTS = [
150# UIAlert(text="Welcome to Airflow", category="info"),
151# UIAlert(text="Upgrade tomorrow [help](https://www.example.com)", category="warning"), #With markdown support
152# ]
153#
154DASHBOARD_UIALERTS: list[UIAlert] = []
157@cache
158def _get_rich_console(file):
159 # Delay imports until we need it
160 import rich.console
162 return rich.console.Console(file=file)
165def custom_show_warning(message, category, filename, lineno, file=None, line=None):
166 """Print rich and visible warnings."""
167 # Delay imports until we need it
168 import re
170 from rich.markup import escape
172 re_escape_regex = re.compile(r"(\\*)(\[[a-z#/@][^[]*?])").sub
173 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}"
174 msg += f" {category.__name__}[/bold]: {escape(str(message), _escape=re_escape_regex)}[/yellow]"
175 write_console = _get_rich_console(file or sys.stderr)
176 write_console.print(msg, soft_wrap=True)
179def replace_showwarning(replacement):
180 """
181 Replace ``warnings.showwarning``, returning the original.
183 This is useful since we want to "reset" the ``showwarning`` hook on exit to
184 avoid lazy-loading issues. If a warning is emitted after Python cleaned up
185 the import system, we would no longer be able to import ``rich``.
186 """
187 original = warnings.showwarning
188 warnings.showwarning = replacement
189 return original
192original_show_warning = replace_showwarning(custom_show_warning)
193atexit.register(partial(replace_showwarning, original_show_warning))
196def task_policy(task):
197 return get_policy_plugin_manager().hook.task_policy(task=task)
200def dag_policy(dag):
201 return get_policy_plugin_manager().hook.dag_policy(dag=dag)
204def task_instance_mutation_hook(task_instance):
205 return get_policy_plugin_manager().hook.task_instance_mutation_hook(task_instance=task_instance)
208task_instance_mutation_hook.is_noop = True # type: ignore
211def pod_mutation_hook(pod):
212 return get_policy_plugin_manager().hook.pod_mutation_hook(pod=pod)
215def get_airflow_context_vars(context):
216 return get_policy_plugin_manager().hook.get_airflow_context_vars(context=context)
219def get_dagbag_import_timeout(dag_file_path: str):
220 return get_policy_plugin_manager().hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)
223@cache
224def get_policy_plugin_manager() -> pluggy.PluginManager:
225 plugin_mgr = pluggy.PluginManager(policies.local_settings_hookspec.project_name)
226 plugin_mgr.add_hookspecs(policies)
227 plugin_mgr.register(policies.DefaultPolicy)
228 return plugin_mgr
231def load_policy_plugins(pm: pluggy.PluginManager):
232 # We can't log duration etc here, as logging hasn't yet been configured!
233 pm.load_setuptools_entrypoints("airflow.policy")
236def _get_async_conn_uri_from_sync(sync_uri):
237 AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
238 """Mapping of sync scheme to async scheme."""
240 scheme, rest = sync_uri.split(":", maxsplit=1)
241 scheme = scheme.split("+", maxsplit=1)[0]
242 aiolib = AIO_LIBS_MAPPING.get(scheme)
243 if aiolib:
244 return f"{scheme}+{aiolib}:{rest}"
245 return sync_uri
248def configure_vars():
249 """Configure Global Variables from airflow.cfg."""
250 global SQL_ALCHEMY_CONN
251 global SQL_ALCHEMY_CONN_ASYNC
252 global DAGS_FOLDER
253 global PLUGINS_FOLDER
255 SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn")
256 if conf.has_option("database", "sql_alchemy_conn_async"):
257 SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async")
258 else:
259 SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)
261 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
263 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins"))
266def _run_openlineage_runtime_check():
267 """
268 Ensure compatibility of OpenLineage provider package and Airflow version.
270 Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage
271 provider incompatible with future Airflow versions (>= 2.10.0).
272 """
273 ol_package = "apache-airflow-providers-openlineage"
274 try:
275 ol_version = metadata.version(ol_package)
276 except metadata.PackageNotFoundError:
277 return
279 if ol_version and Version(ol_version) < Version("1.8.0.dev0"):
280 raise RuntimeError(
281 f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with "
282 f"`apache-airflow` == `{airflow_version}`. "
283 f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`."
284 )
287def run_providers_custom_runtime_checks():
288 _run_openlineage_runtime_check()
291class SkipDBTestsSession:
292 """
293 This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set.
295 :meta private:
296 """
298 def __init__(self):
299 raise AirflowInternalRuntimeError(
300 "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n"
301 "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n"
302 "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#"
303 "best-practices-for-db-tests on how "
304 "to deal with it and consult examples."
305 )
307 def remove(*args, **kwargs):
308 pass
310 def get_bind(
311 self,
312 mapper=None,
313 clause=None,
314 bind=None,
315 _sa_skip_events=None,
316 _sa_skip_for_implicit_returning=False,
317 ):
318 pass
321def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
322 """Determine whether the database connection URI specifies a relative path."""
323 # Check for non-empty connection string:
324 if not sqla_conn_str:
325 return False
326 # Check for the right URI scheme:
327 if not sqla_conn_str.startswith("sqlite"):
328 return False
329 # In-memory is not useful for production, but useful for writing tests against Airflow for extensions
330 if sqla_conn_str == "sqlite://":
331 return False
332 # Check for absolute path:
333 if sqla_conn_str.startswith(abs_prefix := "sqlite:///") and os.path.isabs(
334 sqla_conn_str[len(abs_prefix) :]
335 ):
336 return False
337 return True
340def _get_connect_args(mode: Literal["sync", "async"]) -> Any:
341 key = {
342 "sync": "sql_alchemy_connect_args",
343 "async": "sql_alchemy_connect_args_async",
344 }[mode]
345 if conf.has_option("database", key):
346 return conf.getimport("database", key)
347 return {}
350def _configure_async_session() -> None:
351 """
352 Configure async SQLAlchemy session.
354 This exists so tests can reconfigure the session. How SQLAlchemy configures
355 this does not work well with Pytest and you can end up with issues when the
356 session and runs in a different event loop from the test itself.
357 """
358 global AsyncSession, async_engine
360 if not SQL_ALCHEMY_CONN_ASYNC:
361 async_engine = None
362 AsyncSession = None
363 return
365 async_engine = create_async_engine(
366 SQL_ALCHEMY_CONN_ASYNC,
367 connect_args=_get_connect_args("async"),
368 future=True,
369 )
370 AsyncSession = async_sessionmaker(
371 bind=async_engine,
372 class_=SAAsyncSession,
373 autoflush=False,
374 expire_on_commit=False,
375 )
378def configure_orm(disable_connection_pool=False, pool_class=None):
379 """Configure ORM using SQLAlchemy."""
380 from airflow._shared.secrets_masker import mask_secret
382 if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
383 from airflow.exceptions import AirflowConfigException
385 raise AirflowConfigException(
386 f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
387 "Please use absolute path such as `sqlite:////tmp/airflow.db`."
388 )
390 global NonScopedSession
391 global Session
392 global engine
394 if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
395 # Skip DB initialization in unit tests, if DB tests are skipped
396 Session = SkipDBTestsSession
397 engine = None
398 return
399 log.debug("Setting up DB connection pool (PID %s)", os.getpid())
400 engine_args = prepare_engine_args(disable_connection_pool, pool_class)
402 connect_args = _get_connect_args("sync")
403 if SQL_ALCHEMY_CONN.startswith("sqlite"):
404 # FastAPI runs sync endpoints in a separate thread. SQLite does not allow
405 # to use objects created in another threads by default. Allowing that in test
406 # to so the `test` thread and the tested endpoints can use common objects.
407 connect_args["check_same_thread"] = False
409 engine = create_engine(
410 SQL_ALCHEMY_CONN,
411 connect_args=connect_args,
412 **engine_args,
413 future=True,
414 )
415 _configure_async_session()
416 mask_secret(engine.url.password)
417 setup_event_handlers(engine)
419 if conf.has_option("database", "sql_alchemy_session_maker"):
420 _session_maker = conf.getimport("database", "sql_alchemy_session_maker")
421 else:
422 _session_maker = partial(
423 sessionmaker,
424 autocommit=False,
425 autoflush=False,
426 expire_on_commit=False,
427 )
428 if engine is None:
429 raise RuntimeError("Engine must be initialized before creating a session")
430 NonScopedSession = _session_maker(engine)
431 Session = scoped_session(NonScopedSession)
433 if register_at_fork := getattr(os, "register_at_fork", None):
434 # https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
435 def clean_in_fork():
436 _globals = globals()
437 if engine := _globals.get("engine"):
438 engine.dispose(close=False)
439 if async_engine := _globals.get("async_engine"):
440 async_engine.sync_engine.dispose(close=False)
442 # Won't work on Windows
443 register_at_fork(after_in_child=clean_in_fork)
446def prepare_engine_args(disable_connection_pool=False, pool_class=None):
447 """Prepare SQLAlchemy engine args."""
448 DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = {
449 "postgresql": (
450 {
451 "insertmanyvalues_page_size": 10000,
452 }
453 | (
454 {}
455 if _USE_PSYCOPG3
456 else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000}
457 )
458 )
459 }
461 default_args = {}
462 for dialect, default in DEFAULT_ENGINE_ARGS.items():
463 if SQL_ALCHEMY_CONN.startswith(dialect):
464 default_args = default.copy()
465 break
467 engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args)
469 if pool_class:
470 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args
471 engine_args["poolclass"] = pool_class
472 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
473 engine_args["poolclass"] = NullPool
474 log.debug("settings.prepare_engine_args(): Using NullPool")
475 elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
476 # Pool size engine args not supported by sqlite.
477 # If no config value is defined for the pool size, select a reasonable value.
478 # 0 means no limit, which could lead to exceeding the Database connection limit.
479 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)
481 # The maximum overflow size of the pool.
482 # When the number of checked-out connections reaches the size set in pool_size,
483 # additional connections will be returned up to this limit.
484 # When those additional connections are returned to the pool, they are disconnected and discarded.
485 # It follows then that the total number of simultaneous connections
486 # the pool will allow is pool_size + max_overflow,
487 # and the total number of "sleeping" connections the pool will allow is pool_size.
488 # max_overflow can be set to -1 to indicate no overflow limit;
489 # no limit will be placed on the total number
490 # of concurrent connections. Defaults to 10.
491 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
493 # The DB server already has a value for wait_timeout (number of seconds after
494 # which an idle sleeping connection should be killed). Since other DBs may
495 # co-exist on the same server, SQLAlchemy should set its
496 # pool_recycle to an equal or smaller value.
497 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
499 # Check connection at the start of each connection pool checkout.
500 # Typically, this is a simple statement like "SELECT 1", but may also make use
501 # of some DBAPI-specific method to test the connection for liveness.
502 # More information here:
503 # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic
504 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
506 log.debug(
507 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
508 "pool_recycle=%d, pid=%d",
509 pool_size,
510 max_overflow,
511 pool_recycle,
512 os.getpid(),
513 )
514 engine_args["pool_size"] = pool_size
515 engine_args["pool_recycle"] = pool_recycle
516 engine_args["pool_pre_ping"] = pool_pre_ping
517 engine_args["max_overflow"] = max_overflow
519 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when
520 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
521 # 'READ COMMITTED' is the default value for PostgreSQL.
522 # More information here:
523 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
524 if SQL_ALCHEMY_CONN.startswith("mysql"):
525 engine_args["isolation_level"] = "READ COMMITTED"
527 return engine_args
530def dispose_orm(do_log: bool = True):
531 """Properly close pooled database connections."""
532 global Session, engine, NonScopedSession
534 _globals = globals()
535 if _globals.get("engine") is None and _globals.get("Session") is None:
536 return
538 if do_log:
539 log.debug("Disposing DB connection pool (PID %s)", os.getpid())
541 if "Session" in _globals and Session is not None:
542 from sqlalchemy.orm.session import close_all_sessions
544 Session.remove()
545 Session = None
546 NonScopedSession = None
547 close_all_sessions()
549 if "engine" in _globals and engine is not None:
550 engine.dispose()
551 engine = None
554def reconfigure_orm(disable_connection_pool=False, pool_class=None):
555 """Properly close database connections and re-configure ORM."""
556 dispose_orm()
557 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)
560def configure_adapters():
561 """Register Adapters and DB Converters."""
562 from pendulum import DateTime as Pendulum
564 if SQL_ALCHEMY_CONN.startswith("sqlite"):
565 from sqlite3 import register_adapter
567 register_adapter(Pendulum, lambda val: val.isoformat(" "))
569 if SQL_ALCHEMY_CONN.startswith("mysql"):
570 try:
571 try:
572 import MySQLdb.converters
573 except ImportError:
574 raise RuntimeError(
575 "You do not have `mysqlclient` package installed. "
576 "Please install it with `pip install mysqlclient` and make sure you have system "
577 "mysql libraries installed, as well as well as `pkg-config` system package "
578 "installed in case you see compilation error during installation."
579 )
581 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
582 except ImportError:
583 pass
584 try:
585 import pymysql.converters
587 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime
588 except ImportError:
589 pass
592def _configure_secrets_masker():
593 """Configure the secrets masker with values from config."""
594 from airflow._shared.secrets_masker import (
595 DEFAULT_SENSITIVE_FIELDS,
596 _secrets_masker as secrets_masker_core,
597 )
598 from airflow.configuration import conf
600 min_length_to_mask = conf.getint("logging", "min_length_masked_secret", fallback=5)
601 secret_mask_adapter = conf.getimport("logging", "secret_mask_adapter", fallback=None)
602 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
603 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names")
604 if sensitive_variable_fields:
605 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")})
607 hide_sensitive_var_conn_fields = conf.getboolean("core", "hide_sensitive_var_conn_fields")
609 core_masker = secrets_masker_core()
610 core_masker.min_length_to_mask = min_length_to_mask
611 core_masker.sensitive_variables_fields = list(sensitive_fields)
612 core_masker.secret_mask_adapter = secret_mask_adapter
613 core_masker.hide_sensitive_var_conn_fields = hide_sensitive_var_conn_fields
615 from airflow.sdk._shared.secrets_masker import _secrets_masker as sdk_secrets_masker
617 sdk_masker = sdk_secrets_masker()
618 sdk_masker.min_length_to_mask = min_length_to_mask
619 sdk_masker.sensitive_variables_fields = list(sensitive_fields)
620 sdk_masker.secret_mask_adapter = secret_mask_adapter
621 sdk_masker.hide_sensitive_var_conn_fields = hide_sensitive_var_conn_fields
624def configure_action_logging() -> None:
625 """Any additional configuration (register callback) for airflow.utils.action_loggers module."""
628def prepare_syspath_for_config_and_plugins():
629 """Update sys.path for the config and plugins directories."""
630 # Add ./config/ for loading custom log parsers etc, or
631 # airflow_local_settings etc.
632 config_path = os.path.join(AIRFLOW_HOME, "config")
633 if config_path not in sys.path:
634 sys.path.append(config_path)
636 if PLUGINS_FOLDER not in sys.path:
637 sys.path.append(PLUGINS_FOLDER)
640def __getattr__(name: str):
641 """Handle deprecated module attributes."""
642 import warnings
644 from airflow.exceptions import RemovedInAirflow4Warning
646 if name == "MASK_SECRETS_IN_LOGS":
647 warnings.warn(
648 "settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns default value of False. "
649 "Use SecretsMasker.enable_log_masking(), disable_log_masking(), or is_log_masking_enabled() instead.",
650 RemovedInAirflow4Warning,
651 stacklevel=2,
652 )
653 return False
654 if name == "WEB_COLORS":
655 warnings.warn(
656 "settings.WEB_COLORS has been removed. This shim returns default value. "
657 "Please upgrade your provider or integration.",
658 RemovedInAirflow4Warning,
659 stacklevel=2,
660 )
661 return {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
662 if name == "EXECUTE_TASKS_NEW_PYTHON_INTERPRETER":
663 warnings.warn(
664 "settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER has been removed. This shim returns default value. "
665 "Please upgrade your provider or integration.",
666 RemovedInAirflow4Warning,
667 stacklevel=2,
668 )
669 return not hasattr(os, "fork") or conf.getboolean(
670 "core",
671 "execute_tasks_new_python_interpreter",
672 fallback=False,
673 )
675 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
678def import_local_settings():
679 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
680 try:
681 import airflow_local_settings
682 except ModuleNotFoundError as e:
683 if e.name == "airflow_local_settings":
684 log.debug("No airflow_local_settings to import.", exc_info=True)
685 else:
686 log.critical(
687 "Failed to import airflow_local_settings due to a transitive module not found error.",
688 exc_info=True,
689 )
690 raise
691 except ImportError:
692 log.critical("Failed to import airflow_local_settings.", exc_info=True)
693 raise
694 else:
695 if hasattr(airflow_local_settings, "__all__"):
696 names = set(airflow_local_settings.__all__)
697 else:
698 names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")}
700 plugin_functions = policies.make_plugin_from_local_settings(
701 get_policy_plugin_manager(), airflow_local_settings, names
702 )
704 # If we have already handled a function by adding it to the plugin,
705 # then don't clobber the global function
706 for name in names - plugin_functions:
707 globals()[name] = getattr(airflow_local_settings, name)
709 if get_policy_plugin_manager().hook.task_instance_mutation_hook.get_hookimpls():
710 task_instance_mutation_hook.is_noop = False
712 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)
715def initialize():
716 """Initialize Airflow with all the settings from this file."""
717 configure_vars()
718 prepare_syspath_for_config_and_plugins()
719 policy_mgr = get_policy_plugin_manager()
720 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything
721 # in airflow_local_settings to take precendec
722 load_policy_plugins(policy_mgr)
723 import_local_settings()
724 configure_logging()
726 configure_adapters()
727 # The webservers import this file from models.py with the default settings.
729 # Configure secrets masker before masking secrets
730 _configure_secrets_masker()
732 is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1"
733 if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None) and not is_worker:
734 configure_orm()
736 # mask the sensitive_config_values
737 conf.mask_secrets()
738 configure_action_logging()
740 # Run any custom runtime checks that needs to be executed for providers
741 run_providers_custom_runtime_checks()
743 # Ensure we close DB connections at scheduler and gunicorn worker terminations
744 atexit.register(dispose_orm)
747# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
748# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
749LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True)
751# By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
752# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
753# loaded from module.
754LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)
756DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")
758# Prefix used by gunicorn workers to indicate they are ready to serve requests
759# Used by GunicornMonitor to track worker readiness via process titles
760GUNICORN_WORKER_READY_PREFIX: str = "[ready] "