Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/settings.py: 65%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

291 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import atexit 

21import functools 

22import json 

23import logging 

24import os 

25import sys 

26import traceback 

27import warnings 

28from importlib import metadata 

29from typing import TYPE_CHECKING, Any, Callable 

30 

31import pluggy 

32from packaging.version import Version 

33from sqlalchemy import create_engine, exc, text 

34from sqlalchemy.orm import scoped_session, sessionmaker 

35from sqlalchemy.pool import NullPool 

36 

37from airflow import __version__ as airflow_version, policies 

38from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # noqa: F401 

39from airflow.exceptions import AirflowInternalRuntimeError, RemovedInAirflow3Warning 

40from airflow.executors import executor_constants 

41from airflow.logging_config import configure_logging 

42from airflow.utils.orm_event_handlers import setup_event_handlers 

43from airflow.utils.sqlalchemy import is_sqlalchemy_v1 

44from airflow.utils.state import State 

45from airflow.utils.timezone import local_timezone, parse_timezone, utc 

46 

47if TYPE_CHECKING: 

48 from sqlalchemy.engine import Engine 

49 from sqlalchemy.orm import Session as SASession 

50 

51 from airflow.www.utils import UIAlert 

52 

53log = logging.getLogger(__name__) 

54 

55try: 

56 if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system": 

57 TIMEZONE = parse_timezone(tz) 

58 else: 

59 TIMEZONE = local_timezone() 

60except Exception: 

61 TIMEZONE = utc 

62 

63log.info("Configured default timezone %s", TIMEZONE) 

64 

65 

66HEADER = "\n".join( 

67 [ 

68 r" ____________ _____________", 

69 r" ____ |__( )_________ __/__ /________ __", 

70 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /", 

71 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /", 

72 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/", 

73 ] 

74) 

75 

76LOGGING_LEVEL = logging.INFO 

77 

78# the prefix to append to gunicorn worker processes after init 

79GUNICORN_WORKER_READY_PREFIX = "[ready] " 

80 

81LOG_FORMAT = conf.get("logging", "log_format") 

82SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format") 

83 

84SQL_ALCHEMY_CONN: str | None = None 

85PLUGINS_FOLDER: str | None = None 

86LOGGING_CLASS_PATH: str | None = None 

87DONOT_MODIFY_HANDLERS: bool | None = None 

88DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER")) 

89 

90engine: Engine 

91Session: Callable[..., SASession] 

92 

93# The JSON library to use for DAG Serialization and De-Serialization 

94json = json 

95 

96# Dictionary containing State and colors associated to each state to 

97# display on the Webserver 

98STATE_COLORS = { 

99 "deferred": "mediumpurple", 

100 "failed": "red", 

101 "queued": "gray", 

102 "removed": "lightgrey", 

103 "restarting": "violet", 

104 "running": "lime", 

105 "scheduled": "tan", 

106 "skipped": "hotpink", 

107 "success": "green", 

108 "up_for_reschedule": "turquoise", 

109 "up_for_retry": "gold", 

110 "upstream_failed": "orange", 

111 "shutdown": "blue", 

112} 

113 

114 

115@functools.lru_cache(maxsize=None) 

116def _get_rich_console(file): 

117 # Delay imports until we need it 

118 import rich.console 

119 

120 return rich.console.Console(file=file) 

121 

122 

123def custom_show_warning(message, category, filename, lineno, file=None, line=None): 

124 """Print rich and visible warnings.""" 

125 # Delay imports until we need it 

126 from rich.markup import escape 

127 

128 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}" 

129 msg += f" {category.__name__}[/bold]: {escape(str(message))}[/yellow]" 

130 write_console = _get_rich_console(file or sys.stderr) 

131 write_console.print(msg, soft_wrap=True) 

132 

133 

134def replace_showwarning(replacement): 

135 """Replace ``warnings.showwarning``, returning the original. 

136 

137 This is useful since we want to "reset" the ``showwarning`` hook on exit to 

138 avoid lazy-loading issues. If a warning is emitted after Python cleaned up 

139 the import system, we would no longer be able to import ``rich``. 

140 """ 

141 original = warnings.showwarning 

142 warnings.showwarning = replacement 

143 return original 

144 

145 

146original_show_warning = replace_showwarning(custom_show_warning) 

147atexit.register(functools.partial(replace_showwarning, original_show_warning)) 

148 

149POLICY_PLUGIN_MANAGER: Any = None # type: ignore 

150 

151 

152def task_policy(task): 

153 return POLICY_PLUGIN_MANAGER.hook.task_policy(task=task) 

154 

155 

156def dag_policy(dag): 

157 return POLICY_PLUGIN_MANAGER.hook.dag_policy(dag=dag) 

158 

159 

160def task_instance_mutation_hook(task_instance): 

161 return POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook(task_instance=task_instance) 

162 

163 

164task_instance_mutation_hook.is_noop = True # type: ignore 

165 

166 

167def pod_mutation_hook(pod): 

168 return POLICY_PLUGIN_MANAGER.hook.pod_mutation_hook(pod=pod) 

169 

170 

171def get_airflow_context_vars(context): 

172 return POLICY_PLUGIN_MANAGER.hook.get_airflow_context_vars(context=context) 

173 

174 

175def get_dagbag_import_timeout(dag_file_path: str): 

176 return POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path) 

177 

178 

179def configure_policy_plugin_manager(): 

180 global POLICY_PLUGIN_MANAGER 

181 

182 POLICY_PLUGIN_MANAGER = pluggy.PluginManager(policies.local_settings_hookspec.project_name) 

183 POLICY_PLUGIN_MANAGER.add_hookspecs(policies) 

184 POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy) 

185 

186 

187def load_policy_plugins(pm: pluggy.PluginManager): 

188 # We can't log duration etc here, as logging hasn't yet been configured! 

189 pm.load_setuptools_entrypoints("airflow.policy") 

190 

191 

192def configure_vars(): 

193 """Configure Global Variables from airflow.cfg.""" 

194 global SQL_ALCHEMY_CONN 

195 global DAGS_FOLDER 

196 global PLUGINS_FOLDER 

197 global DONOT_MODIFY_HANDLERS 

198 SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN") 

199 

200 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) 

201 

202 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins")) 

203 

204 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command 

205 # If the flag is set to False, we remove all handlers from the root logger 

206 # and add all handlers from 'airflow.task' logger to the root Logger. This is done 

207 # to get all the logs from the print & log statements in the DAG files before a task is run 

208 # The handlers are restored after the task completes execution. 

209 DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False) 

210 

211 

212def _run_openlineage_runtime_check(): 

213 """ 

214 Ensure compatibility of OpenLineage provider package and Airflow version. 

215 

216 Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage 

217 provider incompatible with future Airflow versions (>= 2.10.0). 

218 """ 

219 ol_package = "apache-airflow-providers-openlineage" 

220 try: 

221 ol_version = metadata.version(ol_package) 

222 except metadata.PackageNotFoundError: 

223 return 

224 

225 if ol_version and Version(ol_version) < Version("1.8.0.dev0"): 

226 raise RuntimeError( 

227 f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with " 

228 f"`apache-airflow` == `{airflow_version}`. " 

229 f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`." 

230 ) 

231 

232 

233def run_providers_custom_runtime_checks(): 

234 _run_openlineage_runtime_check() 

235 

236 

237class SkipDBTestsSession: 

238 """ 

239 This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set. 

240 

241 :meta private: 

242 """ 

243 

244 def __init__(self): 

245 raise AirflowInternalRuntimeError( 

246 "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n" 

247 "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n" 

248 "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#" 

249 "best-practices-for-db-tests on how " 

250 "to deal with it and consult examples." 

251 ) 

252 

253 def remove(*args, **kwargs): 

254 pass 

255 

256 

257class TracebackSession: 

258 """ 

259 Session that throws error when you try to use it. 

260 

261 Also stores stack at instantiation call site. 

262 

263 :meta private: 

264 """ 

265 

266 def __init__(self): 

267 self.traceback = traceback.extract_stack() 

268 

269 def __getattr__(self, item): 

270 raise RuntimeError( 

271 "TracebackSession object was used but internal API is enabled. " 

272 "You'll need to ensure you are making only RPC calls with this object. " 

273 "The stack list below will show where the TracebackSession object was created." 

274 + "\n".join(traceback.format_list(self.traceback)) 

275 ) 

276 

277 def remove(*args, **kwargs): 

278 pass 

279 

280 

281def configure_orm(disable_connection_pool=False, pool_class=None): 

282 """Configure ORM using SQLAlchemy.""" 

283 from airflow.utils.log.secrets_masker import mask_secret 

284 

285 if ( 

286 SQL_ALCHEMY_CONN 

287 and SQL_ALCHEMY_CONN.startswith("sqlite") 

288 and not SQL_ALCHEMY_CONN.startswith("sqlite:////") 

289 # In memory is not useful for production, but useful for writing tests against Airflow for extensions 

290 and SQL_ALCHEMY_CONN != "sqlite://" 

291 ): 

292 from airflow.exceptions import AirflowConfigException 

293 

294 raise AirflowConfigException( 

295 f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. " 

296 "Please use absolute path such as `sqlite:////tmp/airflow.db`." 

297 ) 

298 

299 global Session 

300 global engine 

301 from airflow.api_internal.internal_api_call import InternalApiConfig 

302 

303 if InternalApiConfig.get_use_internal_api(): 

304 Session = TracebackSession 

305 engine = None 

306 return 

307 elif os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": 

308 # Skip DB initialization in unit tests, if DB tests are skipped 

309 Session = SkipDBTestsSession 

310 engine = None 

311 return 

312 log.debug("Setting up DB connection pool (PID %s)", os.getpid()) 

313 engine_args = prepare_engine_args(disable_connection_pool, pool_class) 

314 

315 if conf.has_option("database", "sql_alchemy_connect_args"): 

316 connect_args = conf.getimport("database", "sql_alchemy_connect_args") 

317 else: 

318 connect_args = {} 

319 

320 engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True) 

321 

322 mask_secret(engine.url.password) 

323 

324 setup_event_handlers(engine) 

325 

326 Session = scoped_session( 

327 sessionmaker( 

328 autocommit=False, 

329 autoflush=False, 

330 bind=engine, 

331 expire_on_commit=False, 

332 ) 

333 ) 

334 

335 

336DEFAULT_ENGINE_ARGS = { 

337 "postgresql": { 

338 "executemany_mode": "values_plus_batch", 

339 "executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000, 

340 "executemany_batch_page_size": 2000, 

341 }, 

342} 

343 

344 

345def prepare_engine_args(disable_connection_pool=False, pool_class=None): 

346 """Prepare SQLAlchemy engine args.""" 

347 default_args = {} 

348 for dialect, default in DEFAULT_ENGINE_ARGS.items(): 

349 if SQL_ALCHEMY_CONN.startswith(dialect): 

350 default_args = default.copy() 

351 break 

352 

353 engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args) # type: ignore 

354 

355 if pool_class: 

356 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args 

357 engine_args["poolclass"] = pool_class 

358 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"): 

359 engine_args["poolclass"] = NullPool 

360 log.debug("settings.prepare_engine_args(): Using NullPool") 

361 elif not SQL_ALCHEMY_CONN.startswith("sqlite"): 

362 # Pool size engine args not supported by sqlite. 

363 # If no config value is defined for the pool size, select a reasonable value. 

364 # 0 means no limit, which could lead to exceeding the Database connection limit. 

365 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5) 

366 

367 # The maximum overflow size of the pool. 

368 # When the number of checked-out connections reaches the size set in pool_size, 

369 # additional connections will be returned up to this limit. 

370 # When those additional connections are returned to the pool, they are disconnected and discarded. 

371 # It follows then that the total number of simultaneous connections 

372 # the pool will allow is pool_size + max_overflow, 

373 # and the total number of "sleeping" connections the pool will allow is pool_size. 

374 # max_overflow can be set to -1 to indicate no overflow limit; 

375 # no limit will be placed on the total number 

376 # of concurrent connections. Defaults to 10. 

377 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10) 

378 

379 # The DB server already has a value for wait_timeout (number of seconds after 

380 # which an idle sleeping connection should be killed). Since other DBs may 

381 # co-exist on the same server, SQLAlchemy should set its 

382 # pool_recycle to an equal or smaller value. 

383 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800) 

384 

385 # Check connection at the start of each connection pool checkout. 

386 # Typically, this is a simple statement like "SELECT 1", but may also make use 

387 # of some DBAPI-specific method to test the connection for liveness. 

388 # More information here: 

389 # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic 

390 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True) 

391 

392 log.debug( 

393 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, " 

394 "pool_recycle=%d, pid=%d", 

395 pool_size, 

396 max_overflow, 

397 pool_recycle, 

398 os.getpid(), 

399 ) 

400 engine_args["pool_size"] = pool_size 

401 engine_args["pool_recycle"] = pool_recycle 

402 engine_args["pool_pre_ping"] = pool_pre_ping 

403 engine_args["max_overflow"] = max_overflow 

404 

405 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when 

406 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots. 

407 # 'READ COMMITTED' is the default value for PostgreSQL. 

408 # More information here: 

409 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html" 

410 

411 if SQL_ALCHEMY_CONN.startswith("mysql"): 

412 engine_args["isolation_level"] = "READ COMMITTED" 

413 

414 if is_sqlalchemy_v1(): 

415 # Allow the user to specify an encoding for their DB otherwise default 

416 # to utf-8 so jobs & users with non-latin1 characters can still use us. 

417 # This parameter was removed in SQLAlchemy 2.x. 

418 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8") 

419 

420 return engine_args 

421 

422 

423def dispose_orm(): 

424 """Properly close pooled database connections.""" 

425 log.debug("Disposing DB connection pool (PID %s)", os.getpid()) 

426 global engine 

427 global Session 

428 

429 if Session is not None: # type: ignore[truthy-function] 

430 Session.remove() 

431 Session = None 

432 if engine: 

433 engine.dispose() 

434 engine = None 

435 

436 

437def reconfigure_orm(disable_connection_pool=False, pool_class=None): 

438 """Properly close database connections and re-configure ORM.""" 

439 dispose_orm() 

440 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class) 

441 

442 

443def configure_adapters(): 

444 """Register Adapters and DB Converters.""" 

445 from pendulum import DateTime as Pendulum 

446 

447 if SQL_ALCHEMY_CONN.startswith("sqlite"): 

448 from sqlite3 import register_adapter 

449 

450 register_adapter(Pendulum, lambda val: val.isoformat(" ")) 

451 

452 if SQL_ALCHEMY_CONN.startswith("mysql"): 

453 try: 

454 import MySQLdb.converters 

455 

456 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal 

457 except ImportError: 

458 pass 

459 try: 

460 import pymysql.converters 

461 

462 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime 

463 except ImportError: 

464 pass 

465 

466 

467def validate_session(): 

468 """Validate ORM Session.""" 

469 global engine 

470 

471 worker_precheck = conf.getboolean("celery", "worker_precheck") 

472 if not worker_precheck: 

473 return True 

474 else: 

475 check_session = sessionmaker(bind=engine) 

476 session = check_session() 

477 try: 

478 session.execute(text("select 1")) 

479 conn_status = True 

480 except exc.DBAPIError as err: 

481 log.error(err) 

482 conn_status = False 

483 session.close() 

484 return conn_status 

485 

486 

487def configure_action_logging() -> None: 

488 """Any additional configuration (register callback) for airflow.utils.action_loggers module.""" 

489 

490 

491def prepare_syspath(): 

492 """Ensure certain subfolders of AIRFLOW_HOME are on the classpath.""" 

493 if DAGS_FOLDER not in sys.path: 

494 sys.path.append(DAGS_FOLDER) 

495 

496 # Add ./config/ for loading custom log parsers etc, or 

497 # airflow_local_settings etc. 

498 config_path = os.path.join(AIRFLOW_HOME, "config") 

499 if config_path not in sys.path: 

500 sys.path.append(config_path) 

501 

502 if PLUGINS_FOLDER not in sys.path: 

503 sys.path.append(PLUGINS_FOLDER) 

504 

505 

506def get_session_lifetime_config(): 

507 """Get session timeout configs and handle outdated configs gracefully.""" 

508 session_lifetime_minutes = conf.get("webserver", "session_lifetime_minutes", fallback=None) 

509 session_lifetime_days = conf.get("webserver", "session_lifetime_days", fallback=None) 

510 uses_deprecated_lifetime_configs = session_lifetime_days or conf.get( 

511 "webserver", "force_log_out_after", fallback=None 

512 ) 

513 

514 minutes_per_day = 24 * 60 

515 default_lifetime_minutes = "43200" 

516 if uses_deprecated_lifetime_configs and session_lifetime_minutes == default_lifetime_minutes: 

517 warnings.warn( 

518 "`session_lifetime_days` option from `[webserver]` section has been " 

519 "renamed to `session_lifetime_minutes`. The new option allows to configure " 

520 "session lifetime in minutes. The `force_log_out_after` option has been removed " 

521 "from `[webserver]` section. Please update your configuration.", 

522 category=RemovedInAirflow3Warning, 

523 stacklevel=2, 

524 ) 

525 if session_lifetime_days: 

526 session_lifetime_minutes = minutes_per_day * int(session_lifetime_days) 

527 

528 if not session_lifetime_minutes: 

529 session_lifetime_days = 30 

530 session_lifetime_minutes = minutes_per_day * session_lifetime_days 

531 

532 log.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes) 

533 

534 return int(session_lifetime_minutes) 

535 

536 

537def import_local_settings(): 

538 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file.""" 

539 try: 

540 import airflow_local_settings 

541 except ModuleNotFoundError as e: 

542 if e.name == "airflow_local_settings": 

543 log.debug("No airflow_local_settings to import.", exc_info=True) 

544 else: 

545 log.critical( 

546 "Failed to import airflow_local_settings due to a transitive module not found error.", 

547 exc_info=True, 

548 ) 

549 raise 

550 except ImportError: 

551 log.critical("Failed to import airflow_local_settings.", exc_info=True) 

552 raise 

553 else: 

554 if hasattr(airflow_local_settings, "__all__"): 

555 names = set(airflow_local_settings.__all__) 

556 else: 

557 names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")} 

558 

559 if "policy" in names and "task_policy" not in names: 

560 warnings.warn( 

561 "Using `policy` in airflow_local_settings.py is deprecated. " 

562 "Please rename your `policy` to `task_policy`.", 

563 RemovedInAirflow3Warning, 

564 stacklevel=2, 

565 ) 

566 setattr(airflow_local_settings, "task_policy", airflow_local_settings.policy) 

567 names.remove("policy") 

568 

569 plugin_functions = policies.make_plugin_from_local_settings( 

570 POLICY_PLUGIN_MANAGER, airflow_local_settings, names 

571 ) 

572 

573 # If we have already handled a function by adding it to the plugin, 

574 # then don't clobber the global function 

575 for name in names - plugin_functions: 

576 globals()[name] = getattr(airflow_local_settings, name) 

577 

578 if POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook.get_hookimpls(): 

579 task_instance_mutation_hook.is_noop = False 

580 

581 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__) 

582 

583 

584def initialize(): 

585 """Initialize Airflow with all the settings from this file.""" 

586 configure_vars() 

587 prepare_syspath() 

588 configure_policy_plugin_manager() 

589 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything 

590 # in airflow_local_settings to take precendec 

591 load_policy_plugins(POLICY_PLUGIN_MANAGER) 

592 import_local_settings() 

593 global LOGGING_CLASS_PATH 

594 LOGGING_CLASS_PATH = configure_logging() 

595 State.state_color.update(STATE_COLORS) 

596 

597 configure_adapters() 

598 # The webservers import this file from models.py with the default settings. 

599 configure_orm() 

600 configure_action_logging() 

601 

602 # Run any custom runtime checks that needs to be executed for providers 

603 run_providers_custom_runtime_checks() 

604 

605 # Ensure we close DB connections at scheduler and gunicorn worker terminations 

606 atexit.register(dispose_orm) 

607 

608 

609def is_usage_data_collection_enabled() -> bool: 

610 """Check if data collection is enabled.""" 

611 return conf.getboolean("usage_data_collection", "enabled", fallback=True) and ( 

612 os.getenv("SCARF_ANALYTICS", "").strip().lower() != "false" 

613 ) 

614 

615 

616# Const stuff 

617 

618KILOBYTE = 1024 

619MEGABYTE = KILOBYTE * KILOBYTE 

620WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"} 

621 

622 

623# Updating serialized DAG can not be faster than a minimum interval to reduce database 

624# write rate. 

625MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30) 

626 

627# If set to True, serialized DAGs is compressed before writing to DB, 

628COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False) 

629 

630# Fetching serialized DAG can not be faster than a minimum interval to reduce database 

631# read rate. This config controls when your DAGs are updated in the Webserver 

632MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10) 

633 

634CAN_FORK = hasattr(os, "fork") 

635 

636EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean( 

637 "core", 

638 "execute_tasks_new_python_interpreter", 

639 fallback=False, 

640) 

641 

642ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False) 

643 

644# Whether or not to check each dagrun against defined SLAs 

645CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True) 

646 

647USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True) 

648 

649# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False, 

650# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module. 

651LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True) 

652 

653# By default Airflow providers are lazily-discovered (discovery and imports happen only when required). 

654# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or 

655# loaded from module. 

656LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True) 

657 

658# Determines if the executor utilizes Kubernetes 

659IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in { 

660 executor_constants.KUBERNETES_EXECUTOR, 

661 executor_constants.CELERY_KUBERNETES_EXECUTOR, 

662 executor_constants.LOCAL_KUBERNETES_EXECUTOR, 

663} 

664 

665# Executors can set this to true to configure logging correctly for 

666# containerized executors. 

667IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", "")) 

668IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", "")) 

669"""Will be True if running in kubernetes executor pod.""" 

670 

671HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields") 

672 

673# By default this is off, but is automatically configured on when running task 

674# instances 

675MASK_SECRETS_IN_LOGS = False 

676 

677# Display alerts on the dashboard 

678# Useful for warning about setup issues or announcing changes to end users 

679# List of UIAlerts, which allows for specifying the message, category, and roles the 

680# message should be shown to. For example: 

681# from airflow.www.utils import UIAlert 

682# 

683# DASHBOARD_UIALERTS = [ 

684# UIAlert("Welcome to Airflow"), # All users 

685# UIAlert("Airflow update happening next week", roles=["User"]), # Only users with the User role 

686# # A flash message with html: 

687# UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True), 

688# ] 

689# 

690DASHBOARD_UIALERTS: list[UIAlert] = [] 

691 

692# Prefix used to identify tables holding data moved during migration. 

693AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved" 

694 

695DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077") 

696 

697# AIP-44: internal_api (experimental) 

698# This feature is not complete yet, so we disable it by default. 

699_ENABLE_AIP_44: bool = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in { 

700 "true", 

701 "t", 

702 "yes", 

703 "y", 

704 "1", 

705}