Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/settings.py: 64%
264 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import atexit
21import functools
22import json
23import logging
24import os
25import sys
26import warnings
27from typing import TYPE_CHECKING, Any, Callable
29import pendulum
30import pluggy
31import sqlalchemy
32from sqlalchemy import create_engine, exc, text
33from sqlalchemy.engine import Engine
34from sqlalchemy.orm import Session as SASession, scoped_session, sessionmaker
35from sqlalchemy.pool import NullPool
37from airflow import policies
38from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # NOQA F401
39from airflow.exceptions import RemovedInAirflow3Warning
40from airflow.executors import executor_constants
41from airflow.logging_config import configure_logging
42from airflow.utils.orm_event_handlers import setup_event_handlers
43from airflow.utils.state import State
45if TYPE_CHECKING:
46 from airflow.www.utils import UIAlert
48log = logging.getLogger(__name__)
51TIMEZONE = pendulum.tz.timezone("UTC")
52try:
53 tz = conf.get_mandatory_value("core", "default_timezone")
54 if tz == "system":
55 TIMEZONE = pendulum.tz.local_timezone()
56 else:
57 TIMEZONE = pendulum.tz.timezone(tz)
58except Exception:
59 pass
60log.info("Configured default timezone %s", TIMEZONE)
63HEADER = "\n".join(
64 [
65 r" ____________ _____________",
66 r" ____ |__( )_________ __/__ /________ __",
67 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /",
68 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /",
69 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/",
70 ]
71)
73LOGGING_LEVEL = logging.INFO
75# the prefix to append to gunicorn worker processes after init
76GUNICORN_WORKER_READY_PREFIX = "[ready] "
78LOG_FORMAT = conf.get("logging", "log_format")
79SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
81SQL_ALCHEMY_CONN: str | None = None
82PLUGINS_FOLDER: str | None = None
83LOGGING_CLASS_PATH: str | None = None
84DONOT_MODIFY_HANDLERS: bool | None = None
85DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))
87engine: Engine
88Session: Callable[..., SASession]
90# The JSON library to use for DAG Serialization and De-Serialization
91json = json
93# Dictionary containing State and colors associated to each state to
94# display on the Webserver
95STATE_COLORS = {
96 "deferred": "mediumpurple",
97 "failed": "red",
98 "queued": "gray",
99 "removed": "lightgrey",
100 "restarting": "violet",
101 "running": "lime",
102 "scheduled": "tan",
103 "shutdown": "blue",
104 "skipped": "hotpink",
105 "success": "green",
106 "up_for_reschedule": "turquoise",
107 "up_for_retry": "gold",
108 "upstream_failed": "orange",
109}
112@functools.lru_cache(maxsize=None)
113def _get_rich_console(file):
114 # Delay imports until we need it
115 import rich.console
117 return rich.console.Console(file=file)
120def custom_show_warning(message, category, filename, lineno, file=None, line=None):
121 """Custom function to print rich and visible warnings."""
122 # Delay imports until we need it
123 from rich.markup import escape
125 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}"
126 msg += f" {category.__name__}[/bold]: {escape(str(message))}[/yellow]"
127 write_console = _get_rich_console(file or sys.stderr)
128 write_console.print(msg, soft_wrap=True)
131def replace_showwarning(replacement):
132 """Replace ``warnings.showwarning``, returning the original.
134 This is useful since we want to "reset" the ``showwarning`` hook on exit to
135 avoid lazy-loading issues. If a warning is emitted after Python cleaned up
136 the import system, we would no longer be able to import ``rich``.
137 """
138 original = warnings.showwarning
139 warnings.showwarning = replacement
140 return original
143original_show_warning = replace_showwarning(custom_show_warning)
144atexit.register(functools.partial(replace_showwarning, original_show_warning))
146POLICY_PLUGIN_MANAGER: Any = None # type: ignore
149def task_policy(task):
150 return POLICY_PLUGIN_MANAGER.hook.task_policy(task=task)
153def dag_policy(dag):
154 return POLICY_PLUGIN_MANAGER.hook.dag_policy(dag=dag)
157def task_instance_mutation_hook(task_instance):
158 return POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook(task_instance=task_instance)
161task_instance_mutation_hook.is_noop = True # type: ignore
164def pod_mutation_hook(pod):
165 return POLICY_PLUGIN_MANAGER.hook.pod_mutation_hook(pod=pod)
168def get_airflow_context_vars(context):
169 return POLICY_PLUGIN_MANAGER.hook.get_airflow_context_vars(context=context)
172def get_dagbag_import_timeout(dag_file_path: str):
173 return POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)
176def configure_policy_plugin_manager():
177 global POLICY_PLUGIN_MANAGER
179 POLICY_PLUGIN_MANAGER = pluggy.PluginManager(policies.local_settings_hookspec.project_name)
180 POLICY_PLUGIN_MANAGER.add_hookspecs(policies)
181 POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy)
184def load_policy_plugins(pm: pluggy.PluginManager):
185 # We can't log duration etc here, as logging hasn't yet been configured!
186 pm.load_setuptools_entrypoints("airflow.policy")
189def configure_vars():
190 """Configure Global Variables from airflow.cfg."""
191 global SQL_ALCHEMY_CONN
192 global DAGS_FOLDER
193 global PLUGINS_FOLDER
194 global DONOT_MODIFY_HANDLERS
195 SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
196 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
198 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins"))
200 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command
201 # If the flag is set to False, we remove all handlers from the root logger
202 # and add all handlers from 'airflow.task' logger to the root Logger. This is done
203 # to get all the logs from the print & log statements in the DAG files before a task is run
204 # The handlers are restored after the task completes execution.
205 DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False)
208def configure_orm(disable_connection_pool=False, pool_class=None):
209 """Configure ORM using SQLAlchemy."""
210 from airflow.utils.log.secrets_masker import mask_secret
212 log.debug("Setting up DB connection pool (PID %s)", os.getpid())
213 global engine
214 global Session
215 engine_args = prepare_engine_args(disable_connection_pool, pool_class)
217 if conf.has_option("database", "sql_alchemy_connect_args"):
218 connect_args = conf.getimport("database", "sql_alchemy_connect_args")
219 else:
220 connect_args = {}
222 engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True)
224 mask_secret(engine.url.password)
226 setup_event_handlers(engine)
228 Session = scoped_session(
229 sessionmaker(
230 autocommit=False,
231 autoflush=False,
232 bind=engine,
233 expire_on_commit=False,
234 )
235 )
236 if engine.dialect.name == "mssql":
237 session = Session()
238 try:
239 result = session.execute(
240 sqlalchemy.text(
241 "SELECT is_read_committed_snapshot_on FROM sys.databases WHERE name=:database_name"
242 ),
243 params={"database_name": engine.url.database},
244 )
245 data = result.fetchone()[0]
246 if data != 1:
247 log.critical("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.")
248 log.critical("The database %s has it disabled.", engine.url.database)
249 log.critical("This will cause random deadlocks, Refusing to start.")
250 log.critical(
251 "See https://airflow.apache.org/docs/apache-airflow/stable/howto/"
252 "set-up-database.html#setting-up-a-mssql-database"
253 )
254 raise Exception("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.")
255 finally:
256 session.close()
259DEFAULT_ENGINE_ARGS = {
260 "postgresql": {
261 "executemany_mode": "values",
262 "executemany_values_page_size": 10000,
263 "executemany_batch_page_size": 2000,
264 },
265}
268def prepare_engine_args(disable_connection_pool=False, pool_class=None):
269 """Prepare SQLAlchemy engine args."""
270 default_args = {}
271 for dialect, default in DEFAULT_ENGINE_ARGS.items():
272 if SQL_ALCHEMY_CONN.startswith(dialect):
273 default_args = default.copy()
274 break
276 engine_args: dict = conf.getjson(
277 "database", "sql_alchemy_engine_args", fallback=default_args
278 ) # type: ignore
280 if pool_class:
281 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args
282 engine_args["poolclass"] = pool_class
283 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
284 engine_args["poolclass"] = NullPool
285 log.debug("settings.prepare_engine_args(): Using NullPool")
286 elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
287 # Pool size engine args not supported by sqlite.
288 # If no config value is defined for the pool size, select a reasonable value.
289 # 0 means no limit, which could lead to exceeding the Database connection limit.
290 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)
292 # The maximum overflow size of the pool.
293 # When the number of checked-out connections reaches the size set in pool_size,
294 # additional connections will be returned up to this limit.
295 # When those additional connections are returned to the pool, they are disconnected and discarded.
296 # It follows then that the total number of simultaneous connections
297 # the pool will allow is pool_size + max_overflow,
298 # and the total number of "sleeping" connections the pool will allow is pool_size.
299 # max_overflow can be set to -1 to indicate no overflow limit;
300 # no limit will be placed on the total number
301 # of concurrent connections. Defaults to 10.
302 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
304 # The DB server already has a value for wait_timeout (number of seconds after
305 # which an idle sleeping connection should be killed). Since other DBs may
306 # co-exist on the same server, SQLAlchemy should set its
307 # pool_recycle to an equal or smaller value.
308 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
310 # Check connection at the start of each connection pool checkout.
311 # Typically, this is a simple statement like "SELECT 1", but may also make use
312 # of some DBAPI-specific method to test the connection for liveness.
313 # More information here:
314 # https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
315 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
317 log.debug(
318 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
319 "pool_recycle=%d, pid=%d",
320 pool_size,
321 max_overflow,
322 pool_recycle,
323 os.getpid(),
324 )
325 engine_args["pool_size"] = pool_size
326 engine_args["pool_recycle"] = pool_recycle
327 engine_args["pool_pre_ping"] = pool_pre_ping
328 engine_args["max_overflow"] = max_overflow
330 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when
331 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
332 # 'READ COMMITTED' is the default value for PostgreSQL.
333 # More information here:
334 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html"
336 # Similarly MSSQL default isolation level should be set to READ COMMITTED.
337 # We also make sure that READ_COMMITTED_SNAPSHOT option is on, in order to avoid deadlocks when
338 # Select queries are running. This is by default enforced during init/upgrade. More information:
339 # https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql
341 if SQL_ALCHEMY_CONN.startswith(("mysql", "mssql")):
342 engine_args["isolation_level"] = "READ COMMITTED"
344 # Allow the user to specify an encoding for their DB otherwise default
345 # to utf-8 so jobs & users with non-latin1 characters can still use us.
346 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8")
348 return engine_args
351def dispose_orm():
352 """Properly close pooled database connections."""
353 log.debug("Disposing DB connection pool (PID %s)", os.getpid())
354 global engine
355 global Session
357 if Session is not None: # type: ignore[truthy-function]
358 Session.remove()
359 Session = None
360 if engine:
361 engine.dispose()
362 engine = None
365def reconfigure_orm(disable_connection_pool=False, pool_class=None):
366 """Properly close database connections and re-configure ORM."""
367 dispose_orm()
368 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)
371def configure_adapters():
372 """Register Adapters and DB Converters."""
373 from pendulum import DateTime as Pendulum
375 if SQL_ALCHEMY_CONN.startswith("sqlite"):
376 from sqlite3 import register_adapter
378 register_adapter(Pendulum, lambda val: val.isoformat(" "))
380 if SQL_ALCHEMY_CONN.startswith("mysql"):
381 try:
382 import MySQLdb.converters
384 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
385 except ImportError:
386 pass
387 try:
388 import pymysql.converters
390 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime
391 except ImportError:
392 pass
395def validate_session():
396 """Validate ORM Session."""
397 global engine
399 worker_precheck = conf.getboolean("celery", "worker_precheck", fallback=False)
400 if not worker_precheck:
401 return True
402 else:
403 check_session = sessionmaker(bind=engine)
404 session = check_session()
405 try:
406 session.execute(text("select 1"))
407 conn_status = True
408 except exc.DBAPIError as err:
409 log.error(err)
410 conn_status = False
411 session.close()
412 return conn_status
415def configure_action_logging() -> None:
416 """Any additional configuration (register callback) for airflow.utils.action_loggers module."""
419def prepare_syspath():
420 """Ensure certain subfolders of AIRFLOW_HOME are on the classpath."""
421 if DAGS_FOLDER not in sys.path:
422 sys.path.append(DAGS_FOLDER)
424 # Add ./config/ for loading custom log parsers etc, or
425 # airflow_local_settings etc.
426 config_path = os.path.join(AIRFLOW_HOME, "config")
427 if config_path not in sys.path:
428 sys.path.append(config_path)
430 if PLUGINS_FOLDER not in sys.path:
431 sys.path.append(PLUGINS_FOLDER)
434def get_session_lifetime_config():
435 """Gets session timeout configs and handles outdated configs gracefully."""
436 session_lifetime_minutes = conf.get("webserver", "session_lifetime_minutes", fallback=None)
437 session_lifetime_days = conf.get("webserver", "session_lifetime_days", fallback=None)
438 uses_deprecated_lifetime_configs = session_lifetime_days or conf.get(
439 "webserver", "force_log_out_after", fallback=None
440 )
442 minutes_per_day = 24 * 60
443 default_lifetime_minutes = "43200"
444 if uses_deprecated_lifetime_configs and session_lifetime_minutes == default_lifetime_minutes:
445 warnings.warn(
446 "`session_lifetime_days` option from `[webserver]` section has been "
447 "renamed to `session_lifetime_minutes`. The new option allows to configure "
448 "session lifetime in minutes. The `force_log_out_after` option has been removed "
449 "from `[webserver]` section. Please update your configuration.",
450 category=RemovedInAirflow3Warning,
451 )
452 if session_lifetime_days:
453 session_lifetime_minutes = minutes_per_day * int(session_lifetime_days)
455 if not session_lifetime_minutes:
456 session_lifetime_days = 30
457 session_lifetime_minutes = minutes_per_day * session_lifetime_days
459 logging.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes)
461 return int(session_lifetime_minutes)
464def import_local_settings():
465 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
466 try:
467 import airflow_local_settings
469 if hasattr(airflow_local_settings, "__all__"):
470 names = list(airflow_local_settings.__all__)
471 else:
472 names = list(filter(lambda n: not n.startswith("__"), airflow_local_settings.__dict__.keys()))
474 if "policy" in names and "task_policy" not in names:
475 warnings.warn(
476 "Using `policy` in airflow_local_settings.py is deprecated. "
477 "Please rename your `policy` to `task_policy`.",
478 RemovedInAirflow3Warning,
479 stacklevel=2,
480 )
481 setattr(airflow_local_settings, "task_policy", airflow_local_settings.policy)
482 names.remove("policy")
484 plugin_functions = policies.make_plugin_from_local_settings(
485 POLICY_PLUGIN_MANAGER, airflow_local_settings, names
486 )
488 for name in names:
489 # If we have already handled a function by adding it to the plugin, then don't clobber the global
490 # function
491 if name in plugin_functions:
492 continue
494 globals()[name] = getattr(airflow_local_settings, name)
496 if POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook.get_hookimpls():
497 task_instance_mutation_hook.is_noop = False
499 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)
500 except ModuleNotFoundError as e:
501 if e.name == "airflow_local_settings":
502 log.debug("No airflow_local_settings to import.", exc_info=True)
503 else:
504 log.critical(
505 "Failed to import airflow_local_settings due to a transitive module not found error.",
506 exc_info=True,
507 )
508 raise
509 except ImportError:
510 log.critical("Failed to import airflow_local_settings.", exc_info=True)
511 raise
514def initialize():
515 """Initialize Airflow with all the settings from this file."""
516 configure_vars()
517 prepare_syspath()
518 configure_policy_plugin_manager()
519 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything
520 # in airflow_local_settings to take precendec
521 load_policy_plugins(POLICY_PLUGIN_MANAGER)
522 import_local_settings()
523 global LOGGING_CLASS_PATH
524 LOGGING_CLASS_PATH = configure_logging()
525 State.state_color.update(STATE_COLORS)
527 configure_adapters()
528 # The webservers import this file from models.py with the default settings.
529 configure_orm()
530 configure_action_logging()
532 # Ensure we close DB connections at scheduler and gunicorn worker terminations
533 atexit.register(dispose_orm)
536# Const stuff
538KILOBYTE = 1024
539MEGABYTE = KILOBYTE * KILOBYTE
540WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
543# Updating serialized DAG can not be faster than a minimum interval to reduce database
544# write rate.
545MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30)
547# If set to True, serialized DAGs is compressed before writing to DB,
548COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)
550# Fetching serialized DAG can not be faster than a minimum interval to reduce database
551# read rate. This config controls when your DAGs are updated in the Webserver
552MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10)
554CAN_FORK = hasattr(os, "fork")
556EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
557 "core",
558 "execute_tasks_new_python_interpreter",
559 fallback=False,
560)
562ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False)
564# Whether or not to check each dagrun against defined SLAs
565CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True)
567USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)
569# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
570# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
571LAZY_LOAD_PLUGINS = conf.getboolean("core", "lazy_load_plugins", fallback=True)
573# By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
574# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
575# loaded from module.
576LAZY_LOAD_PROVIDERS = conf.getboolean("core", "lazy_discover_providers", fallback=True)
578# Determines if the executor utilizes Kubernetes
579IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
580 executor_constants.KUBERNETES_EXECUTOR,
581 executor_constants.CELERY_KUBERNETES_EXECUTOR,
582 executor_constants.LOCAL_KUBERNETES_EXECUTOR,
583}
584IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
585"""Will be True if running in kubernetes executor pod."""
587HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")
589# By default this is off, but is automatically configured on when running task
590# instances
591MASK_SECRETS_IN_LOGS = False
593# Display alerts on the dashboard
594# Useful for warning about setup issues or announcing changes to end users
595# List of UIAlerts, which allows for specifying the message, category, and roles the
596# message should be shown to. For example:
597# from airflow.www.utils import UIAlert
598#
599# DASHBOARD_UIALERTS = [
600# UIAlert("Welcome to Airflow"), # All users
601# UIAlert("Airflow update happening next week", roles=["User"]), # Only users with the User role
602# # A flash message with html:
603# UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True),
604# ]
605#
606DASHBOARD_UIALERTS: list[UIAlert] = []
608# Prefix used to identify tables holding data moved during migration.
609AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
611DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")
614# AIP-44: internal_api (experimental)
615# This feature is not complete yet, so we disable it by default.
616_ENABLE_AIP_44 = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {"true", "t", "yes", "y", "1"}