Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/settings.py: 64%

264 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import atexit 

21import functools 

22import json 

23import logging 

24import os 

25import sys 

26import warnings 

27from typing import TYPE_CHECKING, Any, Callable 

28 

29import pendulum 

30import pluggy 

31import sqlalchemy 

32from sqlalchemy import create_engine, exc, text 

33from sqlalchemy.engine import Engine 

34from sqlalchemy.orm import Session as SASession, scoped_session, sessionmaker 

35from sqlalchemy.pool import NullPool 

36 

37from airflow import policies 

38from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # NOQA F401 

39from airflow.exceptions import RemovedInAirflow3Warning 

40from airflow.executors import executor_constants 

41from airflow.logging_config import configure_logging 

42from airflow.utils.orm_event_handlers import setup_event_handlers 

43from airflow.utils.state import State 

44 

45if TYPE_CHECKING: 

46 from airflow.www.utils import UIAlert 

47 

48log = logging.getLogger(__name__) 

49 

50 

51TIMEZONE = pendulum.tz.timezone("UTC") 

52try: 

53 tz = conf.get_mandatory_value("core", "default_timezone") 

54 if tz == "system": 

55 TIMEZONE = pendulum.tz.local_timezone() 

56 else: 

57 TIMEZONE = pendulum.tz.timezone(tz) 

58except Exception: 

59 pass 

60log.info("Configured default timezone %s", TIMEZONE) 

61 

62 

63HEADER = "\n".join( 

64 [ 

65 r" ____________ _____________", 

66 r" ____ |__( )_________ __/__ /________ __", 

67 r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /", 

68 r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /", 

69 r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/", 

70 ] 

71) 

72 

73LOGGING_LEVEL = logging.INFO 

74 

75# the prefix to append to gunicorn worker processes after init 

76GUNICORN_WORKER_READY_PREFIX = "[ready] " 

77 

78LOG_FORMAT = conf.get("logging", "log_format") 

79SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format") 

80 

81SQL_ALCHEMY_CONN: str | None = None 

82PLUGINS_FOLDER: str | None = None 

83LOGGING_CLASS_PATH: str | None = None 

84DONOT_MODIFY_HANDLERS: bool | None = None 

85DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER")) 

86 

87engine: Engine 

88Session: Callable[..., SASession] 

89 

90# The JSON library to use for DAG Serialization and De-Serialization 

91json = json 

92 

93# Dictionary containing State and colors associated to each state to 

94# display on the Webserver 

95STATE_COLORS = { 

96 "deferred": "mediumpurple", 

97 "failed": "red", 

98 "queued": "gray", 

99 "removed": "lightgrey", 

100 "restarting": "violet", 

101 "running": "lime", 

102 "scheduled": "tan", 

103 "shutdown": "blue", 

104 "skipped": "hotpink", 

105 "success": "green", 

106 "up_for_reschedule": "turquoise", 

107 "up_for_retry": "gold", 

108 "upstream_failed": "orange", 

109} 

110 

111 

112@functools.lru_cache(maxsize=None) 

113def _get_rich_console(file): 

114 # Delay imports until we need it 

115 import rich.console 

116 

117 return rich.console.Console(file=file) 

118 

119 

120def custom_show_warning(message, category, filename, lineno, file=None, line=None): 

121 """Custom function to print rich and visible warnings.""" 

122 # Delay imports until we need it 

123 from rich.markup import escape 

124 

125 msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}" 

126 msg += f" {category.__name__}[/bold]: {escape(str(message))}[/yellow]" 

127 write_console = _get_rich_console(file or sys.stderr) 

128 write_console.print(msg, soft_wrap=True) 

129 

130 

131def replace_showwarning(replacement): 

132 """Replace ``warnings.showwarning``, returning the original. 

133 

134 This is useful since we want to "reset" the ``showwarning`` hook on exit to 

135 avoid lazy-loading issues. If a warning is emitted after Python cleaned up 

136 the import system, we would no longer be able to import ``rich``. 

137 """ 

138 original = warnings.showwarning 

139 warnings.showwarning = replacement 

140 return original 

141 

142 

143original_show_warning = replace_showwarning(custom_show_warning) 

144atexit.register(functools.partial(replace_showwarning, original_show_warning)) 

145 

146POLICY_PLUGIN_MANAGER: Any = None # type: ignore 

147 

148 

149def task_policy(task): 

150 return POLICY_PLUGIN_MANAGER.hook.task_policy(task=task) 

151 

152 

153def dag_policy(dag): 

154 return POLICY_PLUGIN_MANAGER.hook.dag_policy(dag=dag) 

155 

156 

157def task_instance_mutation_hook(task_instance): 

158 return POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook(task_instance=task_instance) 

159 

160 

161task_instance_mutation_hook.is_noop = True # type: ignore 

162 

163 

164def pod_mutation_hook(pod): 

165 return POLICY_PLUGIN_MANAGER.hook.pod_mutation_hook(pod=pod) 

166 

167 

168def get_airflow_context_vars(context): 

169 return POLICY_PLUGIN_MANAGER.hook.get_airflow_context_vars(context=context) 

170 

171 

172def get_dagbag_import_timeout(dag_file_path: str): 

173 return POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path) 

174 

175 

176def configure_policy_plugin_manager(): 

177 global POLICY_PLUGIN_MANAGER 

178 

179 POLICY_PLUGIN_MANAGER = pluggy.PluginManager(policies.local_settings_hookspec.project_name) 

180 POLICY_PLUGIN_MANAGER.add_hookspecs(policies) 

181 POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy) 

182 

183 

184def load_policy_plugins(pm: pluggy.PluginManager): 

185 # We can't log duration etc here, as logging hasn't yet been configured! 

186 pm.load_setuptools_entrypoints("airflow.policy") 

187 

188 

189def configure_vars(): 

190 """Configure Global Variables from airflow.cfg.""" 

191 global SQL_ALCHEMY_CONN 

192 global DAGS_FOLDER 

193 global PLUGINS_FOLDER 

194 global DONOT_MODIFY_HANDLERS 

195 SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN") 

196 DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) 

197 

198 PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins")) 

199 

200 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command 

201 # If the flag is set to False, we remove all handlers from the root logger 

202 # and add all handlers from 'airflow.task' logger to the root Logger. This is done 

203 # to get all the logs from the print & log statements in the DAG files before a task is run 

204 # The handlers are restored after the task completes execution. 

205 DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False) 

206 

207 

208def configure_orm(disable_connection_pool=False, pool_class=None): 

209 """Configure ORM using SQLAlchemy.""" 

210 from airflow.utils.log.secrets_masker import mask_secret 

211 

212 log.debug("Setting up DB connection pool (PID %s)", os.getpid()) 

213 global engine 

214 global Session 

215 engine_args = prepare_engine_args(disable_connection_pool, pool_class) 

216 

217 if conf.has_option("database", "sql_alchemy_connect_args"): 

218 connect_args = conf.getimport("database", "sql_alchemy_connect_args") 

219 else: 

220 connect_args = {} 

221 

222 engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True) 

223 

224 mask_secret(engine.url.password) 

225 

226 setup_event_handlers(engine) 

227 

228 Session = scoped_session( 

229 sessionmaker( 

230 autocommit=False, 

231 autoflush=False, 

232 bind=engine, 

233 expire_on_commit=False, 

234 ) 

235 ) 

236 if engine.dialect.name == "mssql": 

237 session = Session() 

238 try: 

239 result = session.execute( 

240 sqlalchemy.text( 

241 "SELECT is_read_committed_snapshot_on FROM sys.databases WHERE name=:database_name" 

242 ), 

243 params={"database_name": engine.url.database}, 

244 ) 

245 data = result.fetchone()[0] 

246 if data != 1: 

247 log.critical("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.") 

248 log.critical("The database %s has it disabled.", engine.url.database) 

249 log.critical("This will cause random deadlocks, Refusing to start.") 

250 log.critical( 

251 "See https://airflow.apache.org/docs/apache-airflow/stable/howto/" 

252 "set-up-database.html#setting-up-a-mssql-database" 

253 ) 

254 raise Exception("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.") 

255 finally: 

256 session.close() 

257 

258 

259DEFAULT_ENGINE_ARGS = { 

260 "postgresql": { 

261 "executemany_mode": "values", 

262 "executemany_values_page_size": 10000, 

263 "executemany_batch_page_size": 2000, 

264 }, 

265} 

266 

267 

268def prepare_engine_args(disable_connection_pool=False, pool_class=None): 

269 """Prepare SQLAlchemy engine args.""" 

270 default_args = {} 

271 for dialect, default in DEFAULT_ENGINE_ARGS.items(): 

272 if SQL_ALCHEMY_CONN.startswith(dialect): 

273 default_args = default.copy() 

274 break 

275 

276 engine_args: dict = conf.getjson( 

277 "database", "sql_alchemy_engine_args", fallback=default_args 

278 ) # type: ignore 

279 

280 if pool_class: 

281 # Don't use separate settings for size etc, only those from sql_alchemy_engine_args 

282 engine_args["poolclass"] = pool_class 

283 elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"): 

284 engine_args["poolclass"] = NullPool 

285 log.debug("settings.prepare_engine_args(): Using NullPool") 

286 elif not SQL_ALCHEMY_CONN.startswith("sqlite"): 

287 # Pool size engine args not supported by sqlite. 

288 # If no config value is defined for the pool size, select a reasonable value. 

289 # 0 means no limit, which could lead to exceeding the Database connection limit. 

290 pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5) 

291 

292 # The maximum overflow size of the pool. 

293 # When the number of checked-out connections reaches the size set in pool_size, 

294 # additional connections will be returned up to this limit. 

295 # When those additional connections are returned to the pool, they are disconnected and discarded. 

296 # It follows then that the total number of simultaneous connections 

297 # the pool will allow is pool_size + max_overflow, 

298 # and the total number of "sleeping" connections the pool will allow is pool_size. 

299 # max_overflow can be set to -1 to indicate no overflow limit; 

300 # no limit will be placed on the total number 

301 # of concurrent connections. Defaults to 10. 

302 max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10) 

303 

304 # The DB server already has a value for wait_timeout (number of seconds after 

305 # which an idle sleeping connection should be killed). Since other DBs may 

306 # co-exist on the same server, SQLAlchemy should set its 

307 # pool_recycle to an equal or smaller value. 

308 pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800) 

309 

310 # Check connection at the start of each connection pool checkout. 

311 # Typically, this is a simple statement like "SELECT 1", but may also make use 

312 # of some DBAPI-specific method to test the connection for liveness. 

313 # More information here: 

314 # https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic 

315 pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True) 

316 

317 log.debug( 

318 "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, " 

319 "pool_recycle=%d, pid=%d", 

320 pool_size, 

321 max_overflow, 

322 pool_recycle, 

323 os.getpid(), 

324 ) 

325 engine_args["pool_size"] = pool_size 

326 engine_args["pool_recycle"] = pool_recycle 

327 engine_args["pool_pre_ping"] = pool_pre_ping 

328 engine_args["max_overflow"] = max_overflow 

329 

330 # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when 

331 # running multiple schedulers, as repeated queries on the same session may read from stale snapshots. 

332 # 'READ COMMITTED' is the default value for PostgreSQL. 

333 # More information here: 

334 # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html" 

335 

336 # Similarly MSSQL default isolation level should be set to READ COMMITTED. 

337 # We also make sure that READ_COMMITTED_SNAPSHOT option is on, in order to avoid deadlocks when 

338 # Select queries are running. This is by default enforced during init/upgrade. More information: 

339 # https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql 

340 

341 if SQL_ALCHEMY_CONN.startswith(("mysql", "mssql")): 

342 engine_args["isolation_level"] = "READ COMMITTED" 

343 

344 # Allow the user to specify an encoding for their DB otherwise default 

345 # to utf-8 so jobs & users with non-latin1 characters can still use us. 

346 engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8") 

347 

348 return engine_args 

349 

350 

351def dispose_orm(): 

352 """Properly close pooled database connections.""" 

353 log.debug("Disposing DB connection pool (PID %s)", os.getpid()) 

354 global engine 

355 global Session 

356 

357 if Session is not None: # type: ignore[truthy-function] 

358 Session.remove() 

359 Session = None 

360 if engine: 

361 engine.dispose() 

362 engine = None 

363 

364 

365def reconfigure_orm(disable_connection_pool=False, pool_class=None): 

366 """Properly close database connections and re-configure ORM.""" 

367 dispose_orm() 

368 configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class) 

369 

370 

371def configure_adapters(): 

372 """Register Adapters and DB Converters.""" 

373 from pendulum import DateTime as Pendulum 

374 

375 if SQL_ALCHEMY_CONN.startswith("sqlite"): 

376 from sqlite3 import register_adapter 

377 

378 register_adapter(Pendulum, lambda val: val.isoformat(" ")) 

379 

380 if SQL_ALCHEMY_CONN.startswith("mysql"): 

381 try: 

382 import MySQLdb.converters 

383 

384 MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal 

385 except ImportError: 

386 pass 

387 try: 

388 import pymysql.converters 

389 

390 pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime 

391 except ImportError: 

392 pass 

393 

394 

395def validate_session(): 

396 """Validate ORM Session.""" 

397 global engine 

398 

399 worker_precheck = conf.getboolean("celery", "worker_precheck", fallback=False) 

400 if not worker_precheck: 

401 return True 

402 else: 

403 check_session = sessionmaker(bind=engine) 

404 session = check_session() 

405 try: 

406 session.execute(text("select 1")) 

407 conn_status = True 

408 except exc.DBAPIError as err: 

409 log.error(err) 

410 conn_status = False 

411 session.close() 

412 return conn_status 

413 

414 

415def configure_action_logging() -> None: 

416 """Any additional configuration (register callback) for airflow.utils.action_loggers module.""" 

417 

418 

419def prepare_syspath(): 

420 """Ensure certain subfolders of AIRFLOW_HOME are on the classpath.""" 

421 if DAGS_FOLDER not in sys.path: 

422 sys.path.append(DAGS_FOLDER) 

423 

424 # Add ./config/ for loading custom log parsers etc, or 

425 # airflow_local_settings etc. 

426 config_path = os.path.join(AIRFLOW_HOME, "config") 

427 if config_path not in sys.path: 

428 sys.path.append(config_path) 

429 

430 if PLUGINS_FOLDER not in sys.path: 

431 sys.path.append(PLUGINS_FOLDER) 

432 

433 

434def get_session_lifetime_config(): 

435 """Gets session timeout configs and handles outdated configs gracefully.""" 

436 session_lifetime_minutes = conf.get("webserver", "session_lifetime_minutes", fallback=None) 

437 session_lifetime_days = conf.get("webserver", "session_lifetime_days", fallback=None) 

438 uses_deprecated_lifetime_configs = session_lifetime_days or conf.get( 

439 "webserver", "force_log_out_after", fallback=None 

440 ) 

441 

442 minutes_per_day = 24 * 60 

443 default_lifetime_minutes = "43200" 

444 if uses_deprecated_lifetime_configs and session_lifetime_minutes == default_lifetime_minutes: 

445 warnings.warn( 

446 "`session_lifetime_days` option from `[webserver]` section has been " 

447 "renamed to `session_lifetime_minutes`. The new option allows to configure " 

448 "session lifetime in minutes. The `force_log_out_after` option has been removed " 

449 "from `[webserver]` section. Please update your configuration.", 

450 category=RemovedInAirflow3Warning, 

451 ) 

452 if session_lifetime_days: 

453 session_lifetime_minutes = minutes_per_day * int(session_lifetime_days) 

454 

455 if not session_lifetime_minutes: 

456 session_lifetime_days = 30 

457 session_lifetime_minutes = minutes_per_day * session_lifetime_days 

458 

459 logging.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes) 

460 

461 return int(session_lifetime_minutes) 

462 

463 

464def import_local_settings(): 

465 """Import airflow_local_settings.py files to allow overriding any configs in settings.py file.""" 

466 try: 

467 import airflow_local_settings 

468 

469 if hasattr(airflow_local_settings, "__all__"): 

470 names = list(airflow_local_settings.__all__) 

471 else: 

472 names = list(filter(lambda n: not n.startswith("__"), airflow_local_settings.__dict__.keys())) 

473 

474 if "policy" in names and "task_policy" not in names: 

475 warnings.warn( 

476 "Using `policy` in airflow_local_settings.py is deprecated. " 

477 "Please rename your `policy` to `task_policy`.", 

478 RemovedInAirflow3Warning, 

479 stacklevel=2, 

480 ) 

481 setattr(airflow_local_settings, "task_policy", airflow_local_settings.policy) 

482 names.remove("policy") 

483 

484 plugin_functions = policies.make_plugin_from_local_settings( 

485 POLICY_PLUGIN_MANAGER, airflow_local_settings, names 

486 ) 

487 

488 for name in names: 

489 # If we have already handled a function by adding it to the plugin, then don't clobber the global 

490 # function 

491 if name in plugin_functions: 

492 continue 

493 

494 globals()[name] = getattr(airflow_local_settings, name) 

495 

496 if POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook.get_hookimpls(): 

497 task_instance_mutation_hook.is_noop = False 

498 

499 log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__) 

500 except ModuleNotFoundError as e: 

501 if e.name == "airflow_local_settings": 

502 log.debug("No airflow_local_settings to import.", exc_info=True) 

503 else: 

504 log.critical( 

505 "Failed to import airflow_local_settings due to a transitive module not found error.", 

506 exc_info=True, 

507 ) 

508 raise 

509 except ImportError: 

510 log.critical("Failed to import airflow_local_settings.", exc_info=True) 

511 raise 

512 

513 

514def initialize(): 

515 """Initialize Airflow with all the settings from this file.""" 

516 configure_vars() 

517 prepare_syspath() 

518 configure_policy_plugin_manager() 

519 # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything 

520 # in airflow_local_settings to take precendec 

521 load_policy_plugins(POLICY_PLUGIN_MANAGER) 

522 import_local_settings() 

523 global LOGGING_CLASS_PATH 

524 LOGGING_CLASS_PATH = configure_logging() 

525 State.state_color.update(STATE_COLORS) 

526 

527 configure_adapters() 

528 # The webservers import this file from models.py with the default settings. 

529 configure_orm() 

530 configure_action_logging() 

531 

532 # Ensure we close DB connections at scheduler and gunicorn worker terminations 

533 atexit.register(dispose_orm) 

534 

535 

536# Const stuff 

537 

538KILOBYTE = 1024 

539MEGABYTE = KILOBYTE * KILOBYTE 

540WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"} 

541 

542 

543# Updating serialized DAG can not be faster than a minimum interval to reduce database 

544# write rate. 

545MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30) 

546 

547# If set to True, serialized DAGs is compressed before writing to DB, 

548COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False) 

549 

550# Fetching serialized DAG can not be faster than a minimum interval to reduce database 

551# read rate. This config controls when your DAGs are updated in the Webserver 

552MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10) 

553 

554CAN_FORK = hasattr(os, "fork") 

555 

556EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean( 

557 "core", 

558 "execute_tasks_new_python_interpreter", 

559 fallback=False, 

560) 

561 

562ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False) 

563 

564# Whether or not to check each dagrun against defined SLAs 

565CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True) 

566 

567USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True) 

568 

569# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False, 

570# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module. 

571LAZY_LOAD_PLUGINS = conf.getboolean("core", "lazy_load_plugins", fallback=True) 

572 

573# By default Airflow providers are lazily-discovered (discovery and imports happen only when required). 

574# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or 

575# loaded from module. 

576LAZY_LOAD_PROVIDERS = conf.getboolean("core", "lazy_discover_providers", fallback=True) 

577 

578# Determines if the executor utilizes Kubernetes 

579IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in { 

580 executor_constants.KUBERNETES_EXECUTOR, 

581 executor_constants.CELERY_KUBERNETES_EXECUTOR, 

582 executor_constants.LOCAL_KUBERNETES_EXECUTOR, 

583} 

584IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", "")) 

585"""Will be True if running in kubernetes executor pod.""" 

586 

587HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields") 

588 

589# By default this is off, but is automatically configured on when running task 

590# instances 

591MASK_SECRETS_IN_LOGS = False 

592 

593# Display alerts on the dashboard 

594# Useful for warning about setup issues or announcing changes to end users 

595# List of UIAlerts, which allows for specifying the message, category, and roles the 

596# message should be shown to. For example: 

597# from airflow.www.utils import UIAlert 

598# 

599# DASHBOARD_UIALERTS = [ 

600# UIAlert("Welcome to Airflow"), # All users 

601# UIAlert("Airflow update happening next week", roles=["User"]), # Only users with the User role 

602# # A flash message with html: 

603# UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True), 

604# ] 

605# 

606DASHBOARD_UIALERTS: list[UIAlert] = [] 

607 

608# Prefix used to identify tables holding data moved during migration. 

609AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved" 

610 

611DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077") 

612 

613 

614# AIP-44: internal_api (experimental) 

615# This feature is not complete yet, so we disable it by default. 

616_ENABLE_AIP_44 = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {"true", "t", "yes", "y", "1"}