Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/settings.py: 65%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import atexit
21import functools
22import json
23import logging
24import os
25import sys
26import traceback
27import warnings
28from importlib import metadata
29from typing import TYPE_CHECKING, Any, Callable
31import pluggy
32from packaging.version import Version
33from sqlalchemy import create_engine, exc, text
34from sqlalchemy.orm import scoped_session, sessionmaker
35from sqlalchemy.pool import NullPool
37from airflow import __version__ as airflow_version, policies
38from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # noqa: F401
39from airflow.exceptions import AirflowInternalRuntimeError, RemovedInAirflow3Warning
40from airflow.executors import executor_constants
41from airflow.logging_config import configure_logging
42from airflow.utils.orm_event_handlers import setup_event_handlers
43from airflow.utils.sqlalchemy import is_sqlalchemy_v1
44from airflow.utils.state import State
45from airflow.utils.timezone import local_timezone, parse_timezone, utc
47if TYPE_CHECKING:
48 from sqlalchemy.engine import Engine
49 from sqlalchemy.orm import Session as SASession
51 from airflow.www.utils import UIAlert
53log = logging.getLogger(__name__)
55try:
56 if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system":
57 TIMEZONE = parse_timezone(tz)
58 else:
59 TIMEZONE = local_timezone()
60except Exception:
61 TIMEZONE = utc
63log.info("Configured default timezone %s", TIMEZONE)
66HEADER = "\n".join(
67 [
68 r" ____________ _____________",
69 r" ____ |__( )_________ __/__ /________ __",
70 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /",
71 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /",
72 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/",
73 ]
74)
76LOGGING_LEVEL = logging.INFO
78# the prefix to append to gunicorn worker processes after init
79GUNICORN_WORKER_READY_PREFIX = "[ready] "
81LOG_FORMAT = conf.get("logging", "log_format")
82SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
84SQL_ALCHEMY_CONN: str | None = None
85PLUGINS_FOLDER: str | None = None
86LOGGING_CLASS_PATH: str | None = None
87DONOT_MODIFY_HANDLERS: bool | None = None
88DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))
90engine: Engine
91Session: Callable[..., SASession]
93# The JSON library to use for DAG Serialization and De-Serialization
94json = json
96# Dictionary containing State and colors associated to each state to
97# display on the Webserver
98STATE_COLORS = {
99 "deferred": "mediumpurple",
100 "failed": "red",
101 "queued": "gray",
102 "removed": "lightgrey",
103 "restarting": "violet",
104 "running": "lime",
105 "scheduled": "tan",
106 "skipped": "hotpink",
107 "success": "green",
108 "up_for_reschedule": "turquoise",
109 "up_for_retry": "gold",
110 "upstream_failed": "orange",
111 "shutdown": "blue",
112}
115@functools.lru_cache(maxsize=None)
116def _get_rich_console(file):
117 # Delay imports until we need it
118 import rich.console
120 return rich.console.Console(file=file)
123def custom_show_warning(message, category, filename, lineno, file=None, line=None):
124 """Print rich and visible warnings."""
125 # Delay imports until we need it
126 from rich.markup import escape
128 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}"
129 msg += f" {category.__name__}[/bold]: {escape(str(message))}[/yellow]"
130 write_console = _get_rich_console(file or sys.stderr)
131 write_console.print(msg, soft_wrap=True)
134def replace_showwarning(replacement):
135 """Replace ``warnings.showwarning``, returning the original.
137 This is useful since we want to "reset" the ``showwarning`` hook on exit to
138 avoid lazy-loading issues. If a warning is emitted after Python cleaned up
139 the import system, we would no longer be able to import ``rich``.
140 """
141 original = warnings.showwarning
142 warnings.showwarning = replacement
143 return original
146original_show_warning = replace_showwarning(custom_show_warning)
147atexit.register(functools.partial(replace_showwarning, original_show_warning))
149POLICY_PLUGIN_MANAGER: Any = None # type: ignore
152def task_policy(task):
153 return POLICY_PLUGIN_MANAGER.hook.task_policy(task=task)
156def dag_policy(dag):
157 return POLICY_PLUGIN_MANAGER.hook.dag_policy(dag=dag)
160def task_instance_mutation_hook(task_instance):
161 return POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook(task_instance=task_instance)
164task_instance_mutation_hook.is_noop = True # type: ignore
167def pod_mutation_hook(pod):
168 return POLICY_PLUGIN_MANAGER.hook.pod_mutation_hook(pod=pod)
171def get_airflow_context_vars(context):
172 return POLICY_PLUGIN_MANAGER.hook.get_airflow_context_vars(context=context)
175def get_dagbag_import_timeout(dag_file_path: str):
176 return POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)
179def configure_policy_plugin_manager():
180 global POLICY_PLUGIN_MANAGER
182 POLICY_PLUGIN_MANAGER = pluggy.PluginManager(policies.local_settings_hookspec.project_name)
183 POLICY_PLUGIN_MANAGER.add_hookspecs(policies)
184 POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy)
187def load_policy_plugins(pm: pluggy.PluginManager):
188 # We can't log duration etc here, as logging hasn't yet been configured!
189 pm.load_setuptools_entrypoints("airflow.policy")
192def configure_vars():
193 """Configure Global Variables from airflow.cfg."""
194 global SQL_ALCHEMY_CONN
195 global DAGS_FOLDER
196 global PLUGINS_FOLDER
197 global DONOT_MODIFY_HANDLERS
198 SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
200 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
202 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins"))
204 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command
205 # If the flag is set to False, we remove all handlers from the root logger
206 # and add all handlers from 'airflow.task' logger to the root Logger. This is done
207 # to get all the logs from the print & log statements in the DAG files before a task is run
208 # The handlers are restored after the task completes execution.
209 DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False)
212def _run_openlineage_runtime_check():
213 """
214 Ensure compatibility of OpenLineage provider package and Airflow version.
216 Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage
217 provider incompatible with future Airflow versions (>= 2.10.0).
218 """
219 ol_package = "apache-airflow-providers-openlineage"
220 try:
221 ol_version = metadata.version(ol_package)
222 except metadata.PackageNotFoundError:
223 return
225 if ol_version and Version(ol_version) < Version("1.8.0.dev0"):
226 raise RuntimeError(
227 f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with "
228 f"`apache-airflow` == `{airflow_version}`. "
229 f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`."
230 )
233def run_providers_custom_runtime_checks():
234 _run_openlineage_runtime_check()
237class SkipDBTestsSession:
238 """
239 This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set.
241 :meta private:
242 """
244 def __init__(self):
245 raise AirflowInternalRuntimeError(
246 "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n"
247 "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n"
248 "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#"
249 "best-practices-for-db-tests on how "
250 "to deal with it and consult examples."
251 )
253 def remove(*args, **kwargs):
254 pass
257class TracebackSession:
258 """
259 Session that throws error when you try to use it.
261 Also stores stack at instantiation call site.
263 :meta private:
264 """
266 def __init__(self):
267 self.traceback = traceback.extract_stack()
269 def __getattr__(self, item):
270 raise RuntimeError(
271 "TracebackSession object was used but internal API is enabled. "
272 "You'll need to ensure you are making only RPC calls with this object. "
273 "The stack list below will show where the TracebackSession object was created."
274 + "\n".join(traceback.format_list(self.traceback))
275 )
277 def remove(*args, **kwargs):
278 pass
281def configure_orm(disable_connection_pool=False, pool_class=None):
282 """Configure ORM using SQLAlchemy."""
283 from airflow.utils.log.secrets_masker import mask_secret
285 if (
286 SQL_ALCHEMY_CONN
287 and SQL_ALCHEMY_CONN.startswith("sqlite")
288 and not SQL_ALCHEMY_CONN.startswith("sqlite:////")
289 # In memory is not useful for production, but useful for writing tests against Airflow for extensions
290 and SQL_ALCHEMY_CONN != "sqlite://"
291 ):
292 from airflow.exceptions import AirflowConfigException
294 raise AirflowConfigException(
295 f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
296 "Please use absolute path such as `sqlite:////tmp/airflow.db`."
297 )
299 global Session
300 global engine
301 from airflow.api_internal.internal_api_call import InternalApiConfig
303 if InternalApiConfig.get_use_internal_api():
304 Session = TracebackSession
305 engine = None
306 return
307 elif os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
308 # Skip DB initialization in unit tests, if DB tests are skipped
309 Session = SkipDBTestsSession
310 engine = None
311 return
312 log.debug("Setting up DB connection pool (PID %s)", os.getpid())
313 engine_args = prepare_engine_args(disable_connection_pool, pool_class)
315 if conf.has_option("database", "sql_alchemy_connect_args"):
316 connect_args = conf.getimport("database", "sql_alchemy_connect_args")
317 else:
318 connect_args = {}
320 engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True)
322 mask_secret(engine.url.password)
324 setup_event_handlers(engine)
326 Session = scoped_session(
327 sessionmaker(
328 autocommit=False,
329 autoflush=False,
330 bind=engine,
331 expire_on_commit=False,
332 )
333 )
336DEFAULT_ENGINE_ARGS = {
337 "postgresql": {
338 "executemany_mode": "values_plus_batch",
339 "executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
340 "executemany_batch_page_size": 2000,
341 },
342}
345def prepare_engine_args(disable_connection_pool=False, pool_class=None):
346 """Prepare SQLAlchemy engine args."""
347 default_args = {}
348 for dialect, default in DEFAULT_ENGINE_ARGS.items():
349 if SQL_ALCHEMY_CONN.startswith(dialect):
350 default_args = default.copy()
351 break
353 engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args) # type: ignore
355 if pool_class:
356 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args
357 engine_args["poolclass"] = pool_class
358 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
359 engine_args["poolclass"] = NullPool
360 log.debug("settings.prepare_engine_args(): Using NullPool")
361 elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
362 # Pool size engine args not supported by sqlite.
363 # If no config value is defined for the pool size, select a reasonable value.
364 # 0 means no limit, which could lead to exceeding the Database connection limit.
365 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)
367 # The maximum overflow size of the pool.
368 # When the number of checked-out connections reaches the size set in pool_size,
369 # additional connections will be returned up to this limit.
370 # When those additional connections are returned to the pool, they are disconnected and discarded.
371 # It follows then that the total number of simultaneous connections
372 # the pool will allow is pool_size + max_overflow,
373 # and the total number of "sleeping" connections the pool will allow is pool_size.
374 # max_overflow can be set to -1 to indicate no overflow limit;
375 # no limit will be placed on the total number
376 # of concurrent connections. Defaults to 10.
377 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
379 # The DB server already has a value for wait_timeout (number of seconds after
380 # which an idle sleeping connection should be killed). Since other DBs may
381 # co-exist on the same server, SQLAlchemy should set its
382 # pool_recycle to an equal or smaller value.
383 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
385 # Check connection at the start of each connection pool checkout.
386 # Typically, this is a simple statement like "SELECT 1", but may also make use
387 # of some DBAPI-specific method to test the connection for liveness.
388 # More information here:
389 # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic
390 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
392 log.debug(
393 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
394 "pool_recycle=%d, pid=%d",
395 pool_size,
396 max_overflow,
397 pool_recycle,
398 os.getpid(),
399 )
400 engine_args["pool_size"] = pool_size
401 engine_args["pool_recycle"] = pool_recycle
402 engine_args["pool_pre_ping"] = pool_pre_ping
403 engine_args["max_overflow"] = max_overflow
405 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when
406 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
407 # 'READ COMMITTED' is the default value for PostgreSQL.
408 # More information here:
409 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html"
411 if SQL_ALCHEMY_CONN.startswith("mysql"):
412 engine_args["isolation_level"] = "READ COMMITTED"
414 if is_sqlalchemy_v1():
415 # Allow the user to specify an encoding for their DB otherwise default
416 # to utf-8 so jobs & users with non-latin1 characters can still use us.
417 # This parameter was removed in SQLAlchemy 2.x.
418 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8")
420 return engine_args
423def dispose_orm():
424 """Properly close pooled database connections."""
425 log.debug("Disposing DB connection pool (PID %s)", os.getpid())
426 global engine
427 global Session
429 if Session is not None: # type: ignore[truthy-function]
430 Session.remove()
431 Session = None
432 if engine:
433 engine.dispose()
434 engine = None
437def reconfigure_orm(disable_connection_pool=False, pool_class=None):
438 """Properly close database connections and re-configure ORM."""
439 dispose_orm()
440 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)
443def configure_adapters():
444 """Register Adapters and DB Converters."""
445 from pendulum import DateTime as Pendulum
447 if SQL_ALCHEMY_CONN.startswith("sqlite"):
448 from sqlite3 import register_adapter
450 register_adapter(Pendulum, lambda val: val.isoformat(" "))
452 if SQL_ALCHEMY_CONN.startswith("mysql"):
453 try:
454 import MySQLdb.converters
456 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
457 except ImportError:
458 pass
459 try:
460 import pymysql.converters
462 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime
463 except ImportError:
464 pass
467def validate_session():
468 """Validate ORM Session."""
469 global engine
471 worker_precheck = conf.getboolean("celery", "worker_precheck")
472 if not worker_precheck:
473 return True
474 else:
475 check_session = sessionmaker(bind=engine)
476 session = check_session()
477 try:
478 session.execute(text("select 1"))
479 conn_status = True
480 except exc.DBAPIError as err:
481 log.error(err)
482 conn_status = False
483 session.close()
484 return conn_status
487def configure_action_logging() -> None:
488 """Any additional configuration (register callback) for airflow.utils.action_loggers module."""
491def prepare_syspath():
492 """Ensure certain subfolders of AIRFLOW_HOME are on the classpath."""
493 if DAGS_FOLDER not in sys.path:
494 sys.path.append(DAGS_FOLDER)
496 # Add ./config/ for loading custom log parsers etc, or
497 # airflow_local_settings etc.
498 config_path = os.path.join(AIRFLOW_HOME, "config")
499 if config_path not in sys.path:
500 sys.path.append(config_path)
502 if PLUGINS_FOLDER not in sys.path:
503 sys.path.append(PLUGINS_FOLDER)
506def get_session_lifetime_config():
507 """Get session timeout configs and handle outdated configs gracefully."""
508 session_lifetime_minutes = conf.get("webserver", "session_lifetime_minutes", fallback=None)
509 session_lifetime_days = conf.get("webserver", "session_lifetime_days", fallback=None)
510 uses_deprecated_lifetime_configs = session_lifetime_days or conf.get(
511 "webserver", "force_log_out_after", fallback=None
512 )
514 minutes_per_day = 24 * 60
515 default_lifetime_minutes = "43200"
516 if uses_deprecated_lifetime_configs and session_lifetime_minutes == default_lifetime_minutes:
517 warnings.warn(
518 "`session_lifetime_days` option from `[webserver]` section has been "
519 "renamed to `session_lifetime_minutes`. The new option allows to configure "
520 "session lifetime in minutes. The `force_log_out_after` option has been removed "
521 "from `[webserver]` section. Please update your configuration.",
522 category=RemovedInAirflow3Warning,
523 stacklevel=2,
524 )
525 if session_lifetime_days:
526 session_lifetime_minutes = minutes_per_day * int(session_lifetime_days)
528 if not session_lifetime_minutes:
529 session_lifetime_days = 30
530 session_lifetime_minutes = minutes_per_day * session_lifetime_days
532 log.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes)
534 return int(session_lifetime_minutes)
537def import_local_settings():
538 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
539 try:
540 import airflow_local_settings
541 except ModuleNotFoundError as e:
542 if e.name == "airflow_local_settings":
543 log.debug("No airflow_local_settings to import.", exc_info=True)
544 else:
545 log.critical(
546 "Failed to import airflow_local_settings due to a transitive module not found error.",
547 exc_info=True,
548 )
549 raise
550 except ImportError:
551 log.critical("Failed to import airflow_local_settings.", exc_info=True)
552 raise
553 else:
554 if hasattr(airflow_local_settings, "__all__"):
555 names = set(airflow_local_settings.__all__)
556 else:
557 names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")}
559 if "policy" in names and "task_policy" not in names:
560 warnings.warn(
561 "Using `policy` in airflow_local_settings.py is deprecated. "
562 "Please rename your `policy` to `task_policy`.",
563 RemovedInAirflow3Warning,
564 stacklevel=2,
565 )
566 setattr(airflow_local_settings, "task_policy", airflow_local_settings.policy)
567 names.remove("policy")
569 plugin_functions = policies.make_plugin_from_local_settings(
570 POLICY_PLUGIN_MANAGER, airflow_local_settings, names
571 )
573 # If we have already handled a function by adding it to the plugin,
574 # then don't clobber the global function
575 for name in names - plugin_functions:
576 globals()[name] = getattr(airflow_local_settings, name)
578 if POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook.get_hookimpls():
579 task_instance_mutation_hook.is_noop = False
581 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)
584def initialize():
585 """Initialize Airflow with all the settings from this file."""
586 configure_vars()
587 prepare_syspath()
588 configure_policy_plugin_manager()
589 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything
590 # in airflow_local_settings to take precendec
591 load_policy_plugins(POLICY_PLUGIN_MANAGER)
592 import_local_settings()
593 global LOGGING_CLASS_PATH
594 LOGGING_CLASS_PATH = configure_logging()
595 State.state_color.update(STATE_COLORS)
597 configure_adapters()
598 # The webservers import this file from models.py with the default settings.
599 configure_orm()
600 configure_action_logging()
602 # Run any custom runtime checks that needs to be executed for providers
603 run_providers_custom_runtime_checks()
605 # Ensure we close DB connections at scheduler and gunicorn worker terminations
606 atexit.register(dispose_orm)
609def is_usage_data_collection_enabled() -> bool:
610 """Check if data collection is enabled."""
611 return conf.getboolean("usage_data_collection", "enabled", fallback=True) and (
612 os.getenv("SCARF_ANALYTICS", "").strip().lower() != "false"
613 )
616# Const stuff
618KILOBYTE = 1024
619MEGABYTE = KILOBYTE * KILOBYTE
620WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
623# Updating serialized DAG can not be faster than a minimum interval to reduce database
624# write rate.
625MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30)
627# If set to True, serialized DAGs is compressed before writing to DB,
628COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)
630# Fetching serialized DAG can not be faster than a minimum interval to reduce database
631# read rate. This config controls when your DAGs are updated in the Webserver
632MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10)
634CAN_FORK = hasattr(os, "fork")
636EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
637 "core",
638 "execute_tasks_new_python_interpreter",
639 fallback=False,
640)
642ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False)
644# Whether or not to check each dagrun against defined SLAs
645CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True)
647USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)
649# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
650# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
651LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True)
653# By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
654# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
655# loaded from module.
656LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)
658# Determines if the executor utilizes Kubernetes
659IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
660 executor_constants.KUBERNETES_EXECUTOR,
661 executor_constants.CELERY_KUBERNETES_EXECUTOR,
662 executor_constants.LOCAL_KUBERNETES_EXECUTOR,
663}
665# Executors can set this to true to configure logging correctly for
666# containerized executors.
667IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", ""))
668IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
669"""Will be True if running in kubernetes executor pod."""
671HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")
673# By default this is off, but is automatically configured on when running task
674# instances
675MASK_SECRETS_IN_LOGS = False
677# Display alerts on the dashboard
678# Useful for warning about setup issues or announcing changes to end users
679# List of UIAlerts, which allows for specifying the message, category, and roles the
680# message should be shown to. For example:
681# from airflow.www.utils import UIAlert
682#
683# DASHBOARD_UIALERTS = [
684# UIAlert("Welcome to Airflow"), # All users
685# UIAlert("Airflow update happening next week", roles=["User"]), # Only users with the User role
686# # A flash message with html:
687# UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True),
688# ]
689#
690DASHBOARD_UIALERTS: list[UIAlert] = []
692# Prefix used to identify tables holding data moved during migration.
693AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
695DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")
697# AIP-44: internal_api (experimental)
698# This feature is not complete yet, so we disable it by default.
699_ENABLE_AIP_44: bool = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {
700 "true",
701 "t",
702 "yes",
703 "y",
704 "1",
705}