Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/executors/executor_loader.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""All executors."""
19from __future__ import annotations
21import functools
22import logging
23import os
24from contextlib import suppress
25from typing import TYPE_CHECKING
27from airflow.api_internal.internal_api_call import InternalApiConfig
28from airflow.exceptions import AirflowConfigException, AirflowException
29from airflow.executors.executor_constants import (
30 CELERY_EXECUTOR,
31 CELERY_KUBERNETES_EXECUTOR,
32 CORE_EXECUTOR_NAMES,
33 DEBUG_EXECUTOR,
34 KUBERNETES_EXECUTOR,
35 LOCAL_EXECUTOR,
36 LOCAL_KUBERNETES_EXECUTOR,
37 SEQUENTIAL_EXECUTOR,
38 ConnectorSource,
39)
40from airflow.executors.executor_utils import ExecutorName
41from airflow.utils.module_loading import import_string
43log = logging.getLogger(__name__)
45if TYPE_CHECKING:
46 from airflow.executors.base_executor import BaseExecutor
49# Used to lookup an ExecutorName via a string alias or module path. An
50# executor may have both so we need two lookup dicts.
51_alias_to_executors: dict[str, ExecutorName] = {}
52_module_to_executors: dict[str, ExecutorName] = {}
53# Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once
54_executor_names: list[ExecutorName] = []
55# Used to cache executors so that we don't construct executor objects unnecessarily
56_loaded_executors: dict[ExecutorName, BaseExecutor] = {}
59class ExecutorLoader:
60 """Keeps constants for all the currently available executors."""
62 executors = {
63 LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
64 LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
65 "executors.local_kubernetes_executor.LocalKubernetesExecutor",
66 SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor",
67 CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor",
68 CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery."
69 "executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
70 KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
71 "executors.kubernetes_executor.KubernetesExecutor",
72 DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
73 }
75 @classmethod
76 def block_use_of_hybrid_exec(cls, executor_config: list):
77 """Raise an exception if the user tries to use multiple executors before the feature is complete.
79 This check is built into a method so that it can be easily mocked in unit tests.
81 :param executor_config: core.executor configuration value.
82 """
83 if len(executor_config) > 1 or ":" in "".join(executor_config):
84 raise AirflowConfigException(
85 "Configuring multiple executors and executor aliases are not yet supported!: "
86 f"{executor_config}"
87 )
89 @classmethod
90 def _get_executor_names(cls) -> list[ExecutorName]:
91 """Return the executor names from Airflow configuration.
93 :return: List of executor names from Airflow configuration
94 """
95 from airflow.configuration import conf
97 if _executor_names:
98 return _executor_names
100 executor_names_raw = conf.get_mandatory_list_value("core", "EXECUTOR")
102 # AIP-61 is WIP. Unblock configuring multiple executors when the feature is ready to launch
103 cls.block_use_of_hybrid_exec(executor_names_raw)
105 executor_names = []
106 for name in executor_names_raw:
107 if len(split_name := name.split(":")) == 1:
108 name = split_name[0]
109 # Check if this is an alias for a core airflow executor, module
110 # paths won't be provided by the user in that case.
111 if core_executor_module := cls.executors.get(name):
112 executor_names.append(ExecutorName(alias=name, module_path=core_executor_module))
113 # Only a module path or plugin name was provided
114 else:
115 executor_names.append(ExecutorName(alias=None, module_path=name))
116 # An alias was provided with the module path
117 elif len(split_name) == 2:
118 # Ensure the user is not trying to override the existing aliases of any of the core
119 # executors by providing an alias along with the existing core airflow executor alias
120 # (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily
121 # complicated. Multiple Executors of the same type will be supported by a future multitenancy
122 # AIP.
123 # The module component should always be a module or plugin path.
124 module_path = split_name[1]
125 if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path:
126 raise AirflowConfigException(
127 "Incorrectly formatted executor configuration. Second portion of an executor "
128 f"configuration must be a module path or plugin but received: {module_path}"
129 )
130 else:
131 executor_names.append(ExecutorName(alias=split_name[0], module_path=split_name[1]))
132 else:
133 raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}")
135 # As of now, we do not allow duplicate executors.
136 # Add all module paths/plugin names to a set, since the actual code is what is unique
137 unique_modules = set([exec_name.module_path for exec_name in executor_names])
138 if len(unique_modules) < len(executor_names):
139 msg = (
140 "At least one executor was configured twice. Duplicate executors are not yet supported. "
141 "Please check your configuration again to correct the issue."
142 )
143 raise AirflowConfigException(msg)
145 # Populate some mappings for fast future lookups
146 for executor_name in executor_names:
147 # Executors will not always have aliases
148 if executor_name.alias:
149 _alias_to_executors[executor_name.alias] = executor_name
150 # All executors will have a module path
151 _module_to_executors[executor_name.module_path] = executor_name
152 # Cache the executor names, so the logic of this method only runs once
153 _executor_names.append(executor_name)
155 return executor_names
157 @classmethod
158 def get_executor_names(cls) -> list[ExecutorName]:
159 """Return the executor names from Airflow configuration.
161 :return: List of executor names from Airflow configuration
162 """
163 return cls._get_executor_names()
165 @classmethod
166 def get_default_executor_name(cls) -> ExecutorName:
167 """Return the default executor name from Airflow configuration.
169 :return: executor name from Airflow configuration
170 """
171 # The default executor is the first configured executor in the list
172 return cls._get_executor_names()[0]
174 @classmethod
175 def get_default_executor(cls) -> BaseExecutor:
176 """Create a new instance of the configured executor if none exists and returns it."""
177 default_executor = cls.load_executor(cls.get_default_executor_name())
179 return default_executor
181 @classmethod
182 def init_executors(cls) -> list[BaseExecutor]:
183 """Create a new instance of all configured executors if not cached already."""
184 executor_names = cls._get_executor_names()
185 loaded_executors = []
186 for executor_name in executor_names:
187 loaded_executor = cls.load_executor(executor_name.module_path)
188 if executor_name.alias:
189 cls.executors[executor_name.alias] = executor_name.module_path
190 else:
191 cls.executors[loaded_executor.__class__.__name__] = executor_name.module_path
193 loaded_executors.append(loaded_executor)
195 return loaded_executors
197 @classmethod
198 def lookup_executor_name_by_str(cls, executor_name_str: str) -> ExecutorName:
199 # lookup the executor by alias first, if not check if we're given a module path
200 if executor_name := _alias_to_executors.get(executor_name_str):
201 return executor_name
202 elif executor_name := _module_to_executors.get(executor_name_str):
203 return executor_name
204 else:
205 raise AirflowException(f"Unknown executor being loaded: {executor_name_str}")
207 @classmethod
208 def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor:
209 """
210 Load the executor.
212 This supports the following formats:
213 * by executor name for core executor
214 * by ``{plugin_name}.{class_name}`` for executor from plugins
215 * by import path.
216 * by ExecutorName object specification
218 :return: an instance of executor class via executor_name
219 """
220 if not executor_name:
221 _executor_name = cls.get_default_executor_name()
222 elif isinstance(executor_name, str):
223 _executor_name = cls.lookup_executor_name_by_str(executor_name)
224 else:
225 _executor_name = executor_name
227 # Check if the executor has been previously loaded. Avoid constructing a new object
228 if _executor_name in _loaded_executors:
229 return _loaded_executors[_executor_name]
231 try:
232 if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR:
233 executor = cls.__load_celery_kubernetes_executor()
234 elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR:
235 executor = cls.__load_local_kubernetes_executor()
236 else:
237 executor_cls, import_source = cls.import_executor_cls(_executor_name)
238 log.debug("Loading executor %s from %s", _executor_name, import_source.value)
239 executor = executor_cls()
241 except ImportError as e:
242 log.error(e)
243 raise AirflowConfigException(
244 f'The module/attribute could not be loaded. Please check "executor" key in "core" section. '
245 f'Current value: "{_executor_name}".'
246 )
247 log.info("Loaded executor: %s", _executor_name)
249 # Store the executor name we've built for this executor in the
250 # instance. This makes it easier for the Scheduler, Backfill, etc to
251 # know how we refer to this executor.
252 executor.name = _executor_name
253 # Cache this executor by name here, so we can look it up later if it is
254 # requested again, and not have to construct a new object
255 _loaded_executors[_executor_name] = executor
257 return executor
259 @classmethod
260 def import_executor_cls(
261 cls, executor_name: ExecutorName, validate: bool = True
262 ) -> tuple[type[BaseExecutor], ConnectorSource]:
263 """
264 Import the executor class.
266 Supports the same formats as ExecutorLoader.load_executor.
268 :param executor_name: Name of core executor or module path to provider provided as a plugin.
269 :param validate: Whether or not to validate the executor before returning
271 :return: executor class via executor_name and executor import source
272 """
274 def _import_and_validate(path: str) -> type[BaseExecutor]:
275 executor = import_string(path)
276 if validate:
277 cls.validate_database_executor_compatibility(executor)
278 return executor
280 if executor_name.connector_source == ConnectorSource.PLUGIN:
281 with suppress(ImportError, AttributeError):
282 # Load plugins here for executors as at that time the plugins might not have been
283 # initialized yet
284 from airflow import plugins_manager
286 plugins_manager.integrate_executor_plugins()
287 return (
288 _import_and_validate(f"airflow.executors.{executor_name.module_path}"),
289 ConnectorSource.PLUGIN,
290 )
291 return _import_and_validate(executor_name.module_path), executor_name.connector_source
293 @classmethod
294 def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]:
295 """
296 Import the default executor class.
298 :param validate: Whether or not to validate the executor before returning
300 :return: executor class and executor import source
301 """
302 executor_name = cls.get_default_executor_name()
303 executor, source = cls.import_executor_cls(executor_name, validate=validate)
304 return executor, source
306 @classmethod
307 @functools.lru_cache(maxsize=None)
308 def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None:
309 """
310 Validate database and executor compatibility.
312 Most of the databases work universally, but SQLite can only work with
313 single-threaded executors (e.g. Sequential).
315 This is NOT done in ``airflow.configuration`` (when configuration is
316 initialized) because loading the executor class is heavy work we want to
317 avoid unless needed.
318 """
319 # Single threaded executors can run with any backend.
320 if executor.is_single_threaded:
321 return
323 # This is set in tests when we want to be able to use SQLite.
324 if os.environ.get("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") == "1":
325 return
327 if InternalApiConfig.get_use_internal_api():
328 return
330 from airflow.settings import engine
332 # SQLite only works with single threaded executors
333 if engine.dialect.name == "sqlite":
334 raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}")
336 @classmethod
337 def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
338 celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
339 kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
341 celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR])
342 return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor)
344 @classmethod
345 def __load_local_kubernetes_executor(cls) -> BaseExecutor:
346 local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
347 kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
349 local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
350 return local_kubernetes_executor_cls(local_executor, kubernetes_executor)
353# This tuple is deprecated due to AIP-51 and is no longer used in core Airflow.
354# TODO: Remove in Airflow 3.0
355UNPICKLEABLE_EXECUTORS = (
356 LOCAL_EXECUTOR,
357 SEQUENTIAL_EXECUTOR,
358)