Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/executors/executor_loader.py: 44%
88 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""All executors."""
18from __future__ import annotations
20import functools
21import logging
22import os
23from contextlib import suppress
24from enum import Enum, unique
25from typing import TYPE_CHECKING
27from airflow.exceptions import AirflowConfigException
28from airflow.executors.executor_constants import (
29 CELERY_EXECUTOR,
30 CELERY_KUBERNETES_EXECUTOR,
31 DASK_EXECUTOR,
32 DEBUG_EXECUTOR,
33 KUBERNETES_EXECUTOR,
34 LOCAL_EXECUTOR,
35 LOCAL_KUBERNETES_EXECUTOR,
36 SEQUENTIAL_EXECUTOR,
37)
38from airflow.utils.module_loading import import_string
40log = logging.getLogger(__name__)
42if TYPE_CHECKING:
43 from airflow.executors.base_executor import BaseExecutor
46@unique
47class ConnectorSource(Enum):
48 """Enum of supported executor import sources."""
50 CORE = "core"
51 PLUGIN = "plugin"
52 CUSTOM_PATH = "custom path"
55class ExecutorLoader:
56 """Keeps constants for all the currently available executors."""
58 _default_executor: BaseExecutor | None = None
59 executors = {
60 LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
61 LOCAL_KUBERNETES_EXECUTOR: "airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor",
62 SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor",
63 CELERY_EXECUTOR: "airflow.executors.celery_executor.CeleryExecutor",
64 CELERY_KUBERNETES_EXECUTOR: "airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
65 DASK_EXECUTOR: "airflow.executors.dask_executor.DaskExecutor",
66 KUBERNETES_EXECUTOR: "airflow.executors.kubernetes_executor.KubernetesExecutor",
67 DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
68 }
70 @classmethod
71 def get_default_executor_name(cls) -> str:
72 """Returns the default executor name from Airflow configuration.
74 :return: executor name from Airflow configuration
75 """
76 from airflow.configuration import conf
78 return conf.get_mandatory_value("core", "EXECUTOR")
80 @classmethod
81 def get_default_executor(cls) -> BaseExecutor:
82 """Creates a new instance of the configured executor if none exists and returns it."""
83 if cls._default_executor is not None:
84 return cls._default_executor
86 return cls.load_executor(cls.get_default_executor_name())
88 @classmethod
89 def load_executor(cls, executor_name: str) -> BaseExecutor:
90 """
91 Loads the executor.
93 This supports the following formats:
94 * by executor name for core executor
95 * by ``{plugin_name}.{class_name}`` for executor from plugins
96 * by import path.
98 :return: an instance of executor class via executor_name
99 """
100 if executor_name == CELERY_KUBERNETES_EXECUTOR:
101 return cls.__load_celery_kubernetes_executor()
102 elif executor_name == LOCAL_KUBERNETES_EXECUTOR:
103 return cls.__load_local_kubernetes_executor()
105 try:
106 executor_cls, import_source = cls.import_executor_cls(executor_name)
107 log.debug("Loading executor %s from %s", executor_name, import_source.value)
108 except ImportError as e:
109 log.error(e)
110 raise AirflowConfigException(
111 f'The module/attribute could not be loaded. Please check "executor" key in "core" section. '
112 f'Current value: "{executor_name}".'
113 )
114 log.info("Loaded executor: %s", executor_name)
116 return executor_cls()
118 @classmethod
119 def import_executor_cls(cls, executor_name: str) -> tuple[type[BaseExecutor], ConnectorSource]:
120 """
121 Imports the executor class.
123 Supports the same formats as ExecutorLoader.load_executor.
125 :return: executor class via executor_name and executor import source
126 """
128 def _import_and_validate(path: str) -> type[BaseExecutor]:
129 executor = import_string(path)
130 cls.validate_database_executor_compatibility(executor)
131 return executor
133 if executor_name in cls.executors:
134 return _import_and_validate(cls.executors[executor_name]), ConnectorSource.CORE
135 if executor_name.count(".") == 1:
136 log.debug(
137 "The executor name looks like the plugin path (executor_name=%s). Trying to import a "
138 "executor from a plugin",
139 executor_name,
140 )
141 with suppress(ImportError, AttributeError):
142 # Load plugins here for executors as at that time the plugins might not have been
143 # initialized yet
144 from airflow import plugins_manager
146 plugins_manager.integrate_executor_plugins()
147 return _import_and_validate(f"airflow.executors.{executor_name}"), ConnectorSource.PLUGIN
148 return _import_and_validate(executor_name), ConnectorSource.CUSTOM_PATH
150 @classmethod
151 def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]:
152 """
153 Imports the default executor class.
155 :return: executor class and executor import source
156 """
157 executor_name = cls.get_default_executor_name()
158 executor, source = cls.import_executor_cls(executor_name)
159 return executor, source
161 @classmethod
162 @functools.lru_cache(maxsize=None)
163 def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None:
164 """Validate database and executor compatibility.
166 Most of the databases work universally, but SQLite can only work with
167 single-threaded executors (e.g. Sequential).
169 This is NOT done in ``airflow.configuration`` (when configuration is
170 initialized) because loading the executor class is heavy work we want to
171 avoid unless needed.
172 """
173 # Single threaded executors can run with any backend.
174 if executor.is_single_threaded:
175 return
177 # This is set in tests when we want to be able to use SQLite.
178 if os.environ.get("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") == "1":
179 return
181 from airflow.settings import engine
183 # SQLite only works with single threaded executors
184 if engine.dialect.name == "sqlite":
185 raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}")
187 @classmethod
188 def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
189 celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
190 kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
192 celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR])
193 return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor)
195 @classmethod
196 def __load_local_kubernetes_executor(cls) -> BaseExecutor:
197 local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
198 kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
200 local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
201 return local_kubernetes_executor_cls(local_executor, kubernetes_executor)
204# This tuple is deprecated due to AIP-51 and is no longer used in core Airflow.
205# TODO: Remove in Airflow 3.0
206UNPICKLEABLE_EXECUTORS = (
207 LOCAL_EXECUTOR,
208 SEQUENTIAL_EXECUTOR,
209 DASK_EXECUTOR,
210)