Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/settings.py: 63%
244 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import atexit
21import functools
22import json
23import logging
24import os
25import sys
26import warnings
27from typing import TYPE_CHECKING, Callable
29import pendulum
30import sqlalchemy
31from sqlalchemy import create_engine, exc
32from sqlalchemy.engine import Engine
33from sqlalchemy.orm import scoped_session, sessionmaker
34from sqlalchemy.orm.session import Session as SASession
35from sqlalchemy.pool import NullPool
37from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # NOQA F401
38from airflow.exceptions import RemovedInAirflow3Warning
39from airflow.executors import executor_constants
40from airflow.logging_config import configure_logging
41from airflow.utils.orm_event_handlers import setup_event_handlers
43if TYPE_CHECKING:
44 from airflow.www.utils import UIAlert
46log = logging.getLogger(__name__)
49TIMEZONE = pendulum.tz.timezone("UTC")
50try:
51 tz = conf.get_mandatory_value("core", "default_timezone")
52 if tz == "system":
53 TIMEZONE = pendulum.tz.local_timezone()
54 else:
55 TIMEZONE = pendulum.tz.timezone(tz)
56except Exception:
57 pass
58log.info("Configured default timezone %s", TIMEZONE)
61HEADER = "\n".join(
62 [
63 r" ____________ _____________",
64 r" ____ |__( )_________ __/__ /________ __",
65 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /",
66 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /",
67 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/",
68 ]
69)
71LOGGING_LEVEL = logging.INFO
73# the prefix to append to gunicorn worker processes after init
74GUNICORN_WORKER_READY_PREFIX = "[ready] "
76LOG_FORMAT = conf.get("logging", "log_format")
77SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
79SQL_ALCHEMY_CONN: str | None = None
80PLUGINS_FOLDER: str | None = None
81LOGGING_CLASS_PATH: str | None = None
82DONOT_MODIFY_HANDLERS: bool | None = None
83DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))
85engine: Engine
86Session: Callable[..., SASession]
88# The JSON library to use for DAG Serialization and De-Serialization
89json = json
91# Dictionary containing State and colors associated to each state to
92# display on the Webserver
93STATE_COLORS = {
94 "deferred": "mediumpurple",
95 "failed": "red",
96 "queued": "gray",
97 "removed": "lightgrey",
98 "restarting": "violet",
99 "running": "lime",
100 "scheduled": "tan",
101 "shutdown": "blue",
102 "skipped": "hotpink",
103 "success": "green",
104 "up_for_reschedule": "turquoise",
105 "up_for_retry": "gold",
106 "upstream_failed": "orange",
107}
110@functools.lru_cache(maxsize=None)
111def _get_rich_console(file):
112 # Delay imports until we need it
113 import rich.console
115 return rich.console.Console(file=file)
118def custom_show_warning(message, category, filename, lineno, file=None, line=None):
119 """Custom function to print rich and visible warnings."""
120 # Delay imports until we need it
121 from rich.markup import escape
123 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}"
124 msg += f" {category.__name__}[/bold]: {escape(str(message))}[/yellow]"
125 write_console = _get_rich_console(file or sys.stderr)
126 write_console.print(msg, soft_wrap=True)
129def replace_showwarning(replacement):
130 """Replace ``warnings.showwarning``, returning the original.
132 This is useful since we want to "reset" the ``showwarning`` hook on exit to
133 avoid lazy-loading issues. If a warning is emitted after Python cleaned up
134 the import system, we would no longer be able to import ``rich``.
135 """
136 original = warnings.showwarning
137 warnings.showwarning = replacement
138 return original
141original_show_warning = replace_showwarning(custom_show_warning)
142atexit.register(functools.partial(replace_showwarning, original_show_warning))
145def task_policy(task) -> None:
146 """
147 This policy setting allows altering tasks after they are loaded in the DagBag.
149 It allows administrator to rewire some task's parameters.
150 Alternatively you can raise ``AirflowClusterPolicyViolation`` exception
151 to stop DAG from being executed.
153 To define policy, add a ``airflow_local_settings`` module
154 to your PYTHONPATH that defines this ``task_policy`` function.
156 Here are a few examples of how this can be useful:
158 * You could enforce a specific queue (say the ``spark`` queue)
159 for tasks using the ``SparkOperator`` to make sure that these
160 tasks get wired to the right workers
161 * You could enforce a task timeout policy, making sure that no tasks run
162 for more than 48 hours
164 :param task: task to be mutated
165 """
168def dag_policy(dag) -> None:
169 """
170 This policy setting allows altering DAGs after they are loaded in the DagBag.
172 It allows administrator to rewire some DAG's parameters.
173 Alternatively you can raise ``AirflowClusterPolicyViolation`` exception
174 to stop DAG from being executed.
176 To define policy, add a ``airflow_local_settings`` module
177 to your PYTHONPATH that defines this ``dag_policy`` function.
179 Here are a few examples of how this can be useful:
181 * You could enforce default user for DAGs
182 * Check if every DAG has configured tags
184 :param dag: dag to be mutated
185 """
188def task_instance_mutation_hook(task_instance):
189 """
190 This setting allows altering task instances before being queued by the Airflow scheduler.
192 To define task_instance_mutation_hook, add a ``airflow_local_settings`` module
193 to your PYTHONPATH that defines this ``task_instance_mutation_hook`` function.
195 This could be used, for instance, to modify the task instance during retries.
197 :param task_instance: task instance to be mutated
198 """
201task_instance_mutation_hook.is_noop = True # type: ignore
204def pod_mutation_hook(pod):
205 """
206 Mutate pod before scheduling.
208 This setting allows altering ``kubernetes.client.models.V1Pod`` object
209 before they are passed to the Kubernetes client for scheduling.
211 To define a pod mutation hook, add a ``airflow_local_settings`` module
212 to your PYTHONPATH that defines this ``pod_mutation_hook`` function.
213 It receives a ``Pod`` object and can alter it where needed.
215 This could be used, for instance, to add sidecar or init containers
216 to every worker pod launched by KubernetesExecutor or KubernetesPodOperator.
217 """
220def get_airflow_context_vars(context):
221 """
222 This setting allows getting the airflow context vars, which are key value pairs.
223 They are then injected to default airflow context vars, which in the end are
224 available as environment variables when running tasks
225 dag_id, task_id, execution_date, dag_run_id, try_number are reserved keys.
226 To define it, add a ``airflow_local_settings`` module
227 to your PYTHONPATH that defines this ``get_airflow_context_vars`` function.
229 :param context: The context for the task_instance of interest.
230 """
231 return {}
234def get_dagbag_import_timeout(dag_file_path: str) -> int | float:
235 """
236 This setting allows for dynamic control of the DAG file parsing timeout based on the DAG file path.
238 It is useful when there are a few DAG files requiring longer parsing times, while others do not.
239 You can control them separately instead of having one value for all DAG files.
241 If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
242 """
243 return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")
246def configure_vars():
247 """Configure Global Variables from airflow.cfg."""
248 global SQL_ALCHEMY_CONN
249 global DAGS_FOLDER
250 global PLUGINS_FOLDER
251 global DONOT_MODIFY_HANDLERS
252 SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
253 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
255 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins"))
257 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command
258 # If the flag is set to False, we remove all handlers from the root logger
259 # and add all handlers from 'airflow.task' logger to the root Logger. This is done
260 # to get all the logs from the print & log statements in the DAG files before a task is run
261 # The handlers are restored after the task completes execution.
262 DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False)
265def configure_orm(disable_connection_pool=False, pool_class=None):
266 """Configure ORM using SQLAlchemy."""
267 from airflow.utils.log.secrets_masker import mask_secret
269 log.debug("Setting up DB connection pool (PID %s)", os.getpid())
270 global engine
271 global Session
272 engine_args = prepare_engine_args(disable_connection_pool, pool_class)
274 if conf.has_option("database", "sql_alchemy_connect_args"):
275 connect_args = conf.getimport("database", "sql_alchemy_connect_args")
276 else:
277 connect_args = {}
279 engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
281 mask_secret(engine.url.password)
283 setup_event_handlers(engine)
285 Session = scoped_session(
286 sessionmaker(
287 autocommit=False,
288 autoflush=False,
289 bind=engine,
290 expire_on_commit=False,
291 )
292 )
293 if engine.dialect.name == "mssql":
294 session = Session()
295 try:
296 result = session.execute(
297 sqlalchemy.text(
298 "SELECT is_read_committed_snapshot_on FROM sys.databases WHERE name=:database_name"
299 ),
300 params={"database_name": engine.url.database},
301 )
302 data = result.fetchone()[0]
303 if data != 1:
304 log.critical("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.")
305 log.critical("The database %s has it disabled.", engine.url.database)
306 log.critical("This will cause random deadlocks, Refusing to start.")
307 log.critical(
308 "See https://airflow.apache.org/docs/apache-airflow/stable/howto/"
309 "set-up-database.html#setting-up-a-mssql-database"
310 )
311 raise Exception("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.")
312 finally:
313 session.close()
316DEFAULT_ENGINE_ARGS = {
317 "postgresql": {
318 "executemany_mode": "values",
319 "executemany_values_page_size": 10000,
320 "executemany_batch_page_size": 2000,
321 },
322}
325def prepare_engine_args(disable_connection_pool=False, pool_class=None):
326 """Prepare SQLAlchemy engine args."""
327 default_args = {}
328 for dialect, default in DEFAULT_ENGINE_ARGS.items():
329 if SQL_ALCHEMY_CONN.startswith(dialect):
330 default_args = default.copy()
331 break
333 engine_args: dict = conf.getjson(
334 "database", "sql_alchemy_engine_args", fallback=default_args
335 ) # type: ignore
337 if pool_class:
338 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args
339 engine_args["poolclass"] = pool_class
340 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
341 engine_args["poolclass"] = NullPool
342 log.debug("settings.prepare_engine_args(): Using NullPool")
343 elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
344 # Pool size engine args not supported by sqlite.
345 # If no config value is defined for the pool size, select a reasonable value.
346 # 0 means no limit, which could lead to exceeding the Database connection limit.
347 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)
349 # The maximum overflow size of the pool.
350 # When the number of checked-out connections reaches the size set in pool_size,
351 # additional connections will be returned up to this limit.
352 # When those additional connections are returned to the pool, they are disconnected and discarded.
353 # It follows then that the total number of simultaneous connections
354 # the pool will allow is pool_size + max_overflow,
355 # and the total number of "sleeping" connections the pool will allow is pool_size.
356 # max_overflow can be set to -1 to indicate no overflow limit;
357 # no limit will be placed on the total number
358 # of concurrent connections. Defaults to 10.
359 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
361 # The DB server already has a value for wait_timeout (number of seconds after
362 # which an idle sleeping connection should be killed). Since other DBs may
363 # co-exist on the same server, SQLAlchemy should set its
364 # pool_recycle to an equal or smaller value.
365 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
367 # Check connection at the start of each connection pool checkout.
368 # Typically, this is a simple statement like "SELECT 1", but may also make use
369 # of some DBAPI-specific method to test the connection for liveness.
370 # More information here:
371 # https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
372 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
374 log.debug(
375 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
376 "pool_recycle=%d, pid=%d",
377 pool_size,
378 max_overflow,
379 pool_recycle,
380 os.getpid(),
381 )
382 engine_args["pool_size"] = pool_size
383 engine_args["pool_recycle"] = pool_recycle
384 engine_args["pool_pre_ping"] = pool_pre_ping
385 engine_args["max_overflow"] = max_overflow
387 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when
388 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
389 # 'READ COMMITTED' is the default value for PostgreSQL.
390 # More information here:
391 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html"
393 # Similarly MSSQL default isolation level should be set to READ COMMITTED.
394 # We also make sure that READ_COMMITTED_SNAPSHOT option is on, in order to avoid deadlocks when
395 # Select queries are running. This is by default enforced during init/upgrade. More information:
396 # https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql
398 if SQL_ALCHEMY_CONN.startswith(("mysql", "mssql")):
399 engine_args["isolation_level"] = "READ COMMITTED"
401 # Allow the user to specify an encoding for their DB otherwise default
402 # to utf-8 so jobs & users with non-latin1 characters can still use us.
403 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8")
405 return engine_args
408def dispose_orm():
409 """Properly close pooled database connections."""
410 log.debug("Disposing DB connection pool (PID %s)", os.getpid())
411 global engine
412 global Session
414 if Session:
415 Session.remove()
416 Session = None
417 if engine:
418 engine.dispose()
419 engine = None
422def reconfigure_orm(disable_connection_pool=False, pool_class=None):
423 """Properly close database connections and re-configure ORM."""
424 dispose_orm()
425 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)
428def configure_adapters():
429 """Register Adapters and DB Converters."""
430 from pendulum import DateTime as Pendulum
432 if SQL_ALCHEMY_CONN.startswith("sqlite"):
433 from sqlite3 import register_adapter
435 register_adapter(Pendulum, lambda val: val.isoformat(" "))
437 if SQL_ALCHEMY_CONN.startswith("mysql"):
438 try:
439 import MySQLdb.converters
441 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
442 except ImportError:
443 pass
444 try:
445 import pymysql.converters
447 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime
448 except ImportError:
449 pass
452def validate_session():
453 """Validate ORM Session."""
454 global engine
456 worker_precheck = conf.getboolean("celery", "worker_precheck", fallback=False)
457 if not worker_precheck:
458 return True
459 else:
460 check_session = sessionmaker(bind=engine)
461 session = check_session()
462 try:
463 session.execute("select 1")
464 conn_status = True
465 except exc.DBAPIError as err:
466 log.error(err)
467 conn_status = False
468 session.close()
469 return conn_status
472def configure_action_logging() -> None:
473 """Any additional configuration (register callback) for airflow.utils.action_loggers module."""
476def prepare_syspath():
477 """Ensure certain subfolders of AIRFLOW_HOME are on the classpath."""
478 if DAGS_FOLDER not in sys.path:
479 sys.path.append(DAGS_FOLDER)
481 # Add ./config/ for loading custom log parsers etc, or
482 # airflow_local_settings etc.
483 config_path = os.path.join(AIRFLOW_HOME, "config")
484 if config_path not in sys.path:
485 sys.path.append(config_path)
487 if PLUGINS_FOLDER not in sys.path:
488 sys.path.append(PLUGINS_FOLDER)
491def get_session_lifetime_config():
492 """Gets session timeout configs and handles outdated configs gracefully."""
493 session_lifetime_minutes = conf.get("webserver", "session_lifetime_minutes", fallback=None)
494 session_lifetime_days = conf.get("webserver", "session_lifetime_days", fallback=None)
495 uses_deprecated_lifetime_configs = session_lifetime_days or conf.get(
496 "webserver", "force_log_out_after", fallback=None
497 )
499 minutes_per_day = 24 * 60
500 default_lifetime_minutes = "43200"
501 if uses_deprecated_lifetime_configs and session_lifetime_minutes == default_lifetime_minutes:
502 warnings.warn(
503 "`session_lifetime_days` option from `[webserver]` section has been "
504 "renamed to `session_lifetime_minutes`. The new option allows to configure "
505 "session lifetime in minutes. The `force_log_out_after` option has been removed "
506 "from `[webserver]` section. Please update your configuration.",
507 category=RemovedInAirflow3Warning,
508 )
509 if session_lifetime_days:
510 session_lifetime_minutes = minutes_per_day * int(session_lifetime_days)
512 if not session_lifetime_minutes:
513 session_lifetime_days = 30
514 session_lifetime_minutes = minutes_per_day * session_lifetime_days
516 logging.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes)
518 return int(session_lifetime_minutes)
521def import_local_settings():
522 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
523 try:
524 import airflow_local_settings
526 if hasattr(airflow_local_settings, "__all__"):
527 for i in airflow_local_settings.__all__:
528 globals()[i] = getattr(airflow_local_settings, i)
529 else:
530 for k, v in airflow_local_settings.__dict__.items():
531 if not k.startswith("__"):
532 globals()[k] = v
534 # TODO: Remove once deprecated
535 if "policy" in globals() and "task_policy" not in globals():
536 warnings.warn(
537 "Using `policy` in airflow_local_settings.py is deprecated. "
538 "Please rename your `policy` to `task_policy`.",
539 DeprecationWarning,
540 stacklevel=2,
541 )
542 globals()["task_policy"] = globals()["policy"]
543 del globals()["policy"]
545 if not hasattr(task_instance_mutation_hook, "is_noop"):
546 task_instance_mutation_hook.is_noop = False
548 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)
549 except ModuleNotFoundError as e:
550 if e.name == "airflow_local_settings":
551 log.debug("No airflow_local_settings to import.", exc_info=True)
552 else:
553 log.critical(
554 "Failed to import airflow_local_settings due to a transitive module not found error.",
555 exc_info=True,
556 )
557 raise
558 except ImportError:
559 log.critical("Failed to import airflow_local_settings.", exc_info=True)
560 raise
563def initialize():
564 """Initialize Airflow with all the settings from this file."""
565 configure_vars()
566 prepare_syspath()
567 import_local_settings()
568 global LOGGING_CLASS_PATH
569 LOGGING_CLASS_PATH = configure_logging()
570 configure_adapters()
571 # The webservers import this file from models.py with the default settings.
572 configure_orm()
573 configure_action_logging()
575 # Ensure we close DB connections at scheduler and gunicorn worker terminations
576 atexit.register(dispose_orm)
579# Const stuff
581KILOBYTE = 1024
582MEGABYTE = KILOBYTE * KILOBYTE
583WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
586# Updating serialized DAG can not be faster than a minimum interval to reduce database
587# write rate.
588MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30)
590# If set to True, serialized DAGs is compressed before writing to DB,
591COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)
593# Fetching serialized DAG can not be faster than a minimum interval to reduce database
594# read rate. This config controls when your DAGs are updated in the Webserver
595MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10)
597CAN_FORK = hasattr(os, "fork")
599EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
600 "core",
601 "execute_tasks_new_python_interpreter",
602 fallback=False,
603)
605ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False)
607# Whether or not to check each dagrun against defined SLAs
608CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True)
610USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)
612# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
613# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
614LAZY_LOAD_PLUGINS = conf.getboolean("core", "lazy_load_plugins", fallback=True)
616# By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
617# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
618# loaded from module.
619LAZY_LOAD_PROVIDERS = conf.getboolean("core", "lazy_discover_providers", fallback=True)
621# Determines if the executor utilizes Kubernetes
622IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
623 executor_constants.KUBERNETES_EXECUTOR,
624 executor_constants.CELERY_KUBERNETES_EXECUTOR,
625 executor_constants.LOCAL_KUBERNETES_EXECUTOR,
626}
628HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")
630# By default this is off, but is automatically configured on when running task
631# instances
632MASK_SECRETS_IN_LOGS = False
634# Display alerts on the dashboard
635# Useful for warning about setup issues or announcing changes to end users
636# List of UIAlerts, which allows for specifying the message, category, and roles the
637# message should be shown to. For example:
638# from airflow.www.utils import UIAlert
639#
640# DASHBOARD_UIALERTS = [
641# UIAlert("Welcome to Airflow"), # All users
642# UIAlert("Airflow update happening next week", roles=["User"]), # Only users with the User role
643# # A flash message with html:
644# UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True),
645# ]
646#
647DASHBOARD_UIALERTS: list[UIAlert] = []
649# Prefix used to identify tables holding data moved during migration.
650AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
652DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")