Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/settings.py: 63%

244 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import atexit 

21import functools 

22import json 

23import logging 

24import os 

25import sys 

26import warnings 

27from typing import TYPE_CHECKING, Callable 

28 

29import pendulum 

30import sqlalchemy 

31from sqlalchemy import create_engine, exc 

32from sqlalchemy.engine import Engine 

33from sqlalchemy.orm import scoped_session, sessionmaker 

34from sqlalchemy.orm.session import Session as SASession 

35from sqlalchemy.pool import NullPool 

36 

37from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # NOQA F401 

38from airflow.exceptions import RemovedInAirflow3Warning 

39from airflow.executors import executor_constants 

40from airflow.logging_config import configure_logging 

41from airflow.utils.orm_event_handlers import setup_event_handlers 

42 

43if TYPE_CHECKING: 

44 from airflow.www.utils import UIAlert 

45 

46log = logging.getLogger(__name__) 

47 

48 

49TIMEZONE = pendulum.tz.timezone("UTC") 

50try: 

51 tz = conf.get_mandatory_value("core", "default_timezone") 

52 if tz == "system": 

53 TIMEZONE = pendulum.tz.local_timezone() 

54 else: 

55 TIMEZONE = pendulum.tz.timezone(tz) 

56except Exception: 

57 pass 

58log.info("Configured default timezone %s", TIMEZONE) 

59 

60 

61HEADER = "\n".join( 

62 [ 

63 r" ____________ _____________", 

64 r" ____ |__( )_________ __/__ /________ __", 

65 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /", 

66 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /", 

67 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/", 

68 ] 

69) 

70 

71LOGGING_LEVEL = logging.INFO 

72 

73# the prefix to append to gunicorn worker processes after init 

74GUNICORN_WORKER_READY_PREFIX = "[ready] " 

75 

76LOG_FORMAT = conf.get("logging", "log_format") 

77SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format") 

78 

79SQL_ALCHEMY_CONN: str | None = None 

80PLUGINS_FOLDER: str | None = None 

81LOGGING_CLASS_PATH: str | None = None 

82DONOT_MODIFY_HANDLERS: bool | None = None 

83DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER")) 

84 

85engine: Engine 

86Session: Callable[..., SASession] 

87 

88# The JSON library to use for DAG Serialization and De-Serialization 

89json = json 

90 

91# Dictionary containing State and colors associated to each state to 

92# display on the Webserver 

93STATE_COLORS = { 

94 "deferred": "mediumpurple", 

95 "failed": "red", 

96 "queued": "gray", 

97 "removed": "lightgrey", 

98 "restarting": "violet", 

99 "running": "lime", 

100 "scheduled": "tan", 

101 "shutdown": "blue", 

102 "skipped": "hotpink", 

103 "success": "green", 

104 "up_for_reschedule": "turquoise", 

105 "up_for_retry": "gold", 

106 "upstream_failed": "orange", 

107} 

108 

109 

110@functools.lru_cache(maxsize=None) 

111def _get_rich_console(file): 

112 # Delay imports until we need it 

113 import rich.console 

114 

115 return rich.console.Console(file=file) 

116 

117 

118def custom_show_warning(message, category, filename, lineno, file=None, line=None): 

119 """Custom function to print rich and visible warnings.""" 

120 # Delay imports until we need it 

121 from rich.markup import escape 

122 

123 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}" 

124 msg += f" {category.__name__}[/bold]: {escape(str(message))}[/yellow]" 

125 write_console = _get_rich_console(file or sys.stderr) 

126 write_console.print(msg, soft_wrap=True) 

127 

128 

129def replace_showwarning(replacement): 

130 """Replace ``warnings.showwarning``, returning the original. 

131 

132 This is useful since we want to "reset" the ``showwarning`` hook on exit to 

133 avoid lazy-loading issues. If a warning is emitted after Python cleaned up 

134 the import system, we would no longer be able to import ``rich``. 

135 """ 

136 original = warnings.showwarning 

137 warnings.showwarning = replacement 

138 return original 

139 

140 

141original_show_warning = replace_showwarning(custom_show_warning) 

142atexit.register(functools.partial(replace_showwarning, original_show_warning)) 

143 

144 

145def task_policy(task) -> None: 

146 """ 

147 This policy setting allows altering tasks after they are loaded in the DagBag. 

148 

149 It allows administrator to rewire some task's parameters. 

150 Alternatively you can raise ``AirflowClusterPolicyViolation`` exception 

151 to stop DAG from being executed. 

152 

153 To define policy, add a ``airflow_local_settings`` module 

154 to your PYTHONPATH that defines this ``task_policy`` function. 

155 

156 Here are a few examples of how this can be useful: 

157 

158 * You could enforce a specific queue (say the ``spark`` queue) 

159 for tasks using the ``SparkOperator`` to make sure that these 

160 tasks get wired to the right workers 

161 * You could enforce a task timeout policy, making sure that no tasks run 

162 for more than 48 hours 

163 

164 :param task: task to be mutated 

165 """ 

166 

167 

168def dag_policy(dag) -> None: 

169 """ 

170 This policy setting allows altering DAGs after they are loaded in the DagBag. 

171 

172 It allows administrator to rewire some DAG's parameters. 

173 Alternatively you can raise ``AirflowClusterPolicyViolation`` exception 

174 to stop DAG from being executed. 

175 

176 To define policy, add a ``airflow_local_settings`` module 

177 to your PYTHONPATH that defines this ``dag_policy`` function. 

178 

179 Here are a few examples of how this can be useful: 

180 

181 * You could enforce default user for DAGs 

182 * Check if every DAG has configured tags 

183 

184 :param dag: dag to be mutated 

185 """ 

186 

187 

188def task_instance_mutation_hook(task_instance): 

189 """ 

190 This setting allows altering task instances before being queued by the Airflow scheduler. 

191 

192 To define task_instance_mutation_hook, add a ``airflow_local_settings`` module 

193 to your PYTHONPATH that defines this ``task_instance_mutation_hook`` function. 

194 

195 This could be used, for instance, to modify the task instance during retries. 

196 

197 :param task_instance: task instance to be mutated 

198 """ 

199 

200 

201task_instance_mutation_hook.is_noop = True # type: ignore 

202 

203 

204def pod_mutation_hook(pod): 

205 """ 

206 Mutate pod before scheduling. 

207 

208 This setting allows altering ``kubernetes.client.models.V1Pod`` object 

209 before they are passed to the Kubernetes client for scheduling. 

210 

211 To define a pod mutation hook, add a ``airflow_local_settings`` module 

212 to your PYTHONPATH that defines this ``pod_mutation_hook`` function. 

213 It receives a ``Pod`` object and can alter it where needed. 

214 

215 This could be used, for instance, to add sidecar or init containers 

216 to every worker pod launched by KubernetesExecutor or KubernetesPodOperator. 

217 """ 

218 

219 

220def get_airflow_context_vars(context): 

221 """ 

222 This setting allows getting the airflow context vars, which are key value pairs. 

223 They are then injected to default airflow context vars, which in the end are 

224 available as environment variables when running tasks 

225 dag_id, task_id, execution_date, dag_run_id, try_number are reserved keys. 

226 To define it, add a ``airflow_local_settings`` module 

227 to your PYTHONPATH that defines this ``get_airflow_context_vars`` function. 

228 

229 :param context: The context for the task_instance of interest. 

230 """ 

231 return {} 

232 

233 

234def get_dagbag_import_timeout(dag_file_path: str) -> int | float: 

235 """ 

236 This setting allows for dynamic control of the DAG file parsing timeout based on the DAG file path. 

237 

238 It is useful when there are a few DAG files requiring longer parsing times, while others do not. 

239 You can control them separately instead of having one value for all DAG files. 

240 

241 If the return value is less than or equal to 0, it means no timeout during the DAG parsing. 

242 """ 

243 return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT") 

244 

245 

246def configure_vars(): 

247 """Configure Global Variables from airflow.cfg.""" 

248 global SQL_ALCHEMY_CONN 

249 global DAGS_FOLDER 

250 global PLUGINS_FOLDER 

251 global DONOT_MODIFY_HANDLERS 

252 SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN") 

253 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) 

254 

255 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins")) 

256 

257 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command 

258 # If the flag is set to False, we remove all handlers from the root logger 

259 # and add all handlers from 'airflow.task' logger to the root Logger. This is done 

260 # to get all the logs from the print & log statements in the DAG files before a task is run 

261 # The handlers are restored after the task completes execution. 

262 DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False) 

263 

264 

265def configure_orm(disable_connection_pool=False, pool_class=None): 

266 """Configure ORM using SQLAlchemy.""" 

267 from airflow.utils.log.secrets_masker import mask_secret 

268 

269 log.debug("Setting up DB connection pool (PID %s)", os.getpid()) 

270 global engine 

271 global Session 

272 engine_args = prepare_engine_args(disable_connection_pool, pool_class) 

273 

274 if conf.has_option("database", "sql_alchemy_connect_args"): 

275 connect_args = conf.getimport("database", "sql_alchemy_connect_args") 

276 else: 

277 connect_args = {} 

278 

279 engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args) 

280 

281 mask_secret(engine.url.password) 

282 

283 setup_event_handlers(engine) 

284 

285 Session = scoped_session( 

286 sessionmaker( 

287 autocommit=False, 

288 autoflush=False, 

289 bind=engine, 

290 expire_on_commit=False, 

291 ) 

292 ) 

293 if engine.dialect.name == "mssql": 

294 session = Session() 

295 try: 

296 result = session.execute( 

297 sqlalchemy.text( 

298 "SELECT is_read_committed_snapshot_on FROM sys.databases WHERE name=:database_name" 

299 ), 

300 params={"database_name": engine.url.database}, 

301 ) 

302 data = result.fetchone()[0] 

303 if data != 1: 

304 log.critical("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.") 

305 log.critical("The database %s has it disabled.", engine.url.database) 

306 log.critical("This will cause random deadlocks, Refusing to start.") 

307 log.critical( 

308 "See https://airflow.apache.org/docs/apache-airflow/stable/howto/" 

309 "set-up-database.html#setting-up-a-mssql-database" 

310 ) 

311 raise Exception("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.") 

312 finally: 

313 session.close() 

314 

315 

316DEFAULT_ENGINE_ARGS = { 

317 "postgresql": { 

318 "executemany_mode": "values", 

319 "executemany_values_page_size": 10000, 

320 "executemany_batch_page_size": 2000, 

321 }, 

322} 

323 

324 

325def prepare_engine_args(disable_connection_pool=False, pool_class=None): 

326 """Prepare SQLAlchemy engine args.""" 

327 default_args = {} 

328 for dialect, default in DEFAULT_ENGINE_ARGS.items(): 

329 if SQL_ALCHEMY_CONN.startswith(dialect): 

330 default_args = default.copy() 

331 break 

332 

333 engine_args: dict = conf.getjson( 

334 "database", "sql_alchemy_engine_args", fallback=default_args 

335 ) # type: ignore 

336 

337 if pool_class: 

338 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args 

339 engine_args["poolclass"] = pool_class 

340 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"): 

341 engine_args["poolclass"] = NullPool 

342 log.debug("settings.prepare_engine_args(): Using NullPool") 

343 elif not SQL_ALCHEMY_CONN.startswith("sqlite"): 

344 # Pool size engine args not supported by sqlite. 

345 # If no config value is defined for the pool size, select a reasonable value. 

346 # 0 means no limit, which could lead to exceeding the Database connection limit. 

347 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5) 

348 

349 # The maximum overflow size of the pool. 

350 # When the number of checked-out connections reaches the size set in pool_size, 

351 # additional connections will be returned up to this limit. 

352 # When those additional connections are returned to the pool, they are disconnected and discarded. 

353 # It follows then that the total number of simultaneous connections 

354 # the pool will allow is pool_size + max_overflow, 

355 # and the total number of "sleeping" connections the pool will allow is pool_size. 

356 # max_overflow can be set to -1 to indicate no overflow limit; 

357 # no limit will be placed on the total number 

358 # of concurrent connections. Defaults to 10. 

359 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10) 

360 

361 # The DB server already has a value for wait_timeout (number of seconds after 

362 # which an idle sleeping connection should be killed). Since other DBs may 

363 # co-exist on the same server, SQLAlchemy should set its 

364 # pool_recycle to an equal or smaller value. 

365 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800) 

366 

367 # Check connection at the start of each connection pool checkout. 

368 # Typically, this is a simple statement like "SELECT 1", but may also make use 

369 # of some DBAPI-specific method to test the connection for liveness. 

370 # More information here: 

371 # https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic 

372 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True) 

373 

374 log.debug( 

375 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, " 

376 "pool_recycle=%d, pid=%d", 

377 pool_size, 

378 max_overflow, 

379 pool_recycle, 

380 os.getpid(), 

381 ) 

382 engine_args["pool_size"] = pool_size 

383 engine_args["pool_recycle"] = pool_recycle 

384 engine_args["pool_pre_ping"] = pool_pre_ping 

385 engine_args["max_overflow"] = max_overflow 

386 

387 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when 

388 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots. 

389 # 'READ COMMITTED' is the default value for PostgreSQL. 

390 # More information here: 

391 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html" 

392 

393 # Similarly MSSQL default isolation level should be set to READ COMMITTED. 

394 # We also make sure that READ_COMMITTED_SNAPSHOT option is on, in order to avoid deadlocks when 

395 # Select queries are running. This is by default enforced during init/upgrade. More information: 

396 # https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql 

397 

398 if SQL_ALCHEMY_CONN.startswith(("mysql", "mssql")): 

399 engine_args["isolation_level"] = "READ COMMITTED" 

400 

401 # Allow the user to specify an encoding for their DB otherwise default 

402 # to utf-8 so jobs & users with non-latin1 characters can still use us. 

403 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8") 

404 

405 return engine_args 

406 

407 

408def dispose_orm(): 

409 """Properly close pooled database connections.""" 

410 log.debug("Disposing DB connection pool (PID %s)", os.getpid()) 

411 global engine 

412 global Session 

413 

414 if Session: 

415 Session.remove() 

416 Session = None 

417 if engine: 

418 engine.dispose() 

419 engine = None 

420 

421 

422def reconfigure_orm(disable_connection_pool=False, pool_class=None): 

423 """Properly close database connections and re-configure ORM.""" 

424 dispose_orm() 

425 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class) 

426 

427 

428def configure_adapters(): 

429 """Register Adapters and DB Converters.""" 

430 from pendulum import DateTime as Pendulum 

431 

432 if SQL_ALCHEMY_CONN.startswith("sqlite"): 

433 from sqlite3 import register_adapter 

434 

435 register_adapter(Pendulum, lambda val: val.isoformat(" ")) 

436 

437 if SQL_ALCHEMY_CONN.startswith("mysql"): 

438 try: 

439 import MySQLdb.converters 

440 

441 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal 

442 except ImportError: 

443 pass 

444 try: 

445 import pymysql.converters 

446 

447 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime 

448 except ImportError: 

449 pass 

450 

451 

452def validate_session(): 

453 """Validate ORM Session.""" 

454 global engine 

455 

456 worker_precheck = conf.getboolean("celery", "worker_precheck", fallback=False) 

457 if not worker_precheck: 

458 return True 

459 else: 

460 check_session = sessionmaker(bind=engine) 

461 session = check_session() 

462 try: 

463 session.execute("select 1") 

464 conn_status = True 

465 except exc.DBAPIError as err: 

466 log.error(err) 

467 conn_status = False 

468 session.close() 

469 return conn_status 

470 

471 

472def configure_action_logging() -> None: 

473 """Any additional configuration (register callback) for airflow.utils.action_loggers module.""" 

474 

475 

476def prepare_syspath(): 

477 """Ensure certain subfolders of AIRFLOW_HOME are on the classpath.""" 

478 if DAGS_FOLDER not in sys.path: 

479 sys.path.append(DAGS_FOLDER) 

480 

481 # Add ./config/ for loading custom log parsers etc, or 

482 # airflow_local_settings etc. 

483 config_path = os.path.join(AIRFLOW_HOME, "config") 

484 if config_path not in sys.path: 

485 sys.path.append(config_path) 

486 

487 if PLUGINS_FOLDER not in sys.path: 

488 sys.path.append(PLUGINS_FOLDER) 

489 

490 

491def get_session_lifetime_config(): 

492 """Gets session timeout configs and handles outdated configs gracefully.""" 

493 session_lifetime_minutes = conf.get("webserver", "session_lifetime_minutes", fallback=None) 

494 session_lifetime_days = conf.get("webserver", "session_lifetime_days", fallback=None) 

495 uses_deprecated_lifetime_configs = session_lifetime_days or conf.get( 

496 "webserver", "force_log_out_after", fallback=None 

497 ) 

498 

499 minutes_per_day = 24 * 60 

500 default_lifetime_minutes = "43200" 

501 if uses_deprecated_lifetime_configs and session_lifetime_minutes == default_lifetime_minutes: 

502 warnings.warn( 

503 "`session_lifetime_days` option from `[webserver]` section has been " 

504 "renamed to `session_lifetime_minutes`. The new option allows to configure " 

505 "session lifetime in minutes. The `force_log_out_after` option has been removed " 

506 "from `[webserver]` section. Please update your configuration.", 

507 category=RemovedInAirflow3Warning, 

508 ) 

509 if session_lifetime_days: 

510 session_lifetime_minutes = minutes_per_day * int(session_lifetime_days) 

511 

512 if not session_lifetime_minutes: 

513 session_lifetime_days = 30 

514 session_lifetime_minutes = minutes_per_day * session_lifetime_days 

515 

516 logging.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes) 

517 

518 return int(session_lifetime_minutes) 

519 

520 

521def import_local_settings(): 

522 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file.""" 

523 try: 

524 import airflow_local_settings 

525 

526 if hasattr(airflow_local_settings, "__all__"): 

527 for i in airflow_local_settings.__all__: 

528 globals()[i] = getattr(airflow_local_settings, i) 

529 else: 

530 for k, v in airflow_local_settings.__dict__.items(): 

531 if not k.startswith("__"): 

532 globals()[k] = v 

533 

534 # TODO: Remove once deprecated 

535 if "policy" in globals() and "task_policy" not in globals(): 

536 warnings.warn( 

537 "Using `policy` in airflow_local_settings.py is deprecated. " 

538 "Please rename your `policy` to `task_policy`.", 

539 DeprecationWarning, 

540 stacklevel=2, 

541 ) 

542 globals()["task_policy"] = globals()["policy"] 

543 del globals()["policy"] 

544 

545 if not hasattr(task_instance_mutation_hook, "is_noop"): 

546 task_instance_mutation_hook.is_noop = False 

547 

548 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__) 

549 except ModuleNotFoundError as e: 

550 if e.name == "airflow_local_settings": 

551 log.debug("No airflow_local_settings to import.", exc_info=True) 

552 else: 

553 log.critical( 

554 "Failed to import airflow_local_settings due to a transitive module not found error.", 

555 exc_info=True, 

556 ) 

557 raise 

558 except ImportError: 

559 log.critical("Failed to import airflow_local_settings.", exc_info=True) 

560 raise 

561 

562 

563def initialize(): 

564 """Initialize Airflow with all the settings from this file.""" 

565 configure_vars() 

566 prepare_syspath() 

567 import_local_settings() 

568 global LOGGING_CLASS_PATH 

569 LOGGING_CLASS_PATH = configure_logging() 

570 configure_adapters() 

571 # The webservers import this file from models.py with the default settings. 

572 configure_orm() 

573 configure_action_logging() 

574 

575 # Ensure we close DB connections at scheduler and gunicorn worker terminations 

576 atexit.register(dispose_orm) 

577 

578 

579# Const stuff 

580 

581KILOBYTE = 1024 

582MEGABYTE = KILOBYTE * KILOBYTE 

583WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"} 

584 

585 

586# Updating serialized DAG can not be faster than a minimum interval to reduce database 

587# write rate. 

588MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30) 

589 

590# If set to True, serialized DAGs is compressed before writing to DB, 

591COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False) 

592 

593# Fetching serialized DAG can not be faster than a minimum interval to reduce database 

594# read rate. This config controls when your DAGs are updated in the Webserver 

595MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10) 

596 

597CAN_FORK = hasattr(os, "fork") 

598 

599EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean( 

600 "core", 

601 "execute_tasks_new_python_interpreter", 

602 fallback=False, 

603) 

604 

605ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False) 

606 

607# Whether or not to check each dagrun against defined SLAs 

608CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True) 

609 

610USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True) 

611 

612# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False, 

613# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module. 

614LAZY_LOAD_PLUGINS = conf.getboolean("core", "lazy_load_plugins", fallback=True) 

615 

616# By default Airflow providers are lazily-discovered (discovery and imports happen only when required). 

617# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or 

618# loaded from module. 

619LAZY_LOAD_PROVIDERS = conf.getboolean("core", "lazy_discover_providers", fallback=True) 

620 

621# Determines if the executor utilizes Kubernetes 

622IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in { 

623 executor_constants.KUBERNETES_EXECUTOR, 

624 executor_constants.CELERY_KUBERNETES_EXECUTOR, 

625 executor_constants.LOCAL_KUBERNETES_EXECUTOR, 

626} 

627 

628HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields") 

629 

630# By default this is off, but is automatically configured on when running task 

631# instances 

632MASK_SECRETS_IN_LOGS = False 

633 

634# Display alerts on the dashboard 

635# Useful for warning about setup issues or announcing changes to end users 

636# List of UIAlerts, which allows for specifying the message, category, and roles the 

637# message should be shown to. For example: 

638# from airflow.www.utils import UIAlert 

639# 

640# DASHBOARD_UIALERTS = [ 

641# UIAlert("Welcome to Airflow"), # All users 

642# UIAlert("Airflow update happening next week", roles=["User"]), # Only users with the User role 

643# # A flash message with html: 

644# UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True), 

645# ] 

646# 

647DASHBOARD_UIALERTS: list[UIAlert] = [] 

648 

649# Prefix used to identify tables holding data moved during migration. 

650AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved" 

651 

652DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")