1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""All executors."""
18
19from __future__ import annotations
20
21import os
22from collections import defaultdict
23from typing import TYPE_CHECKING
24
25import structlog
26
27from airflow._shared.module_loading import import_string
28from airflow.exceptions import AirflowConfigException, UnknownExecutorException
29from airflow.executors.executor_constants import (
30 CELERY_EXECUTOR,
31 CORE_EXECUTOR_NAMES,
32 KUBERNETES_EXECUTOR,
33 LOCAL_EXECUTOR,
34 ConnectorSource,
35)
36from airflow.executors.executor_utils import ExecutorName
37from airflow.models.team import Team
38
39log = structlog.get_logger(__name__)
40
41if TYPE_CHECKING:
42 from airflow.executors.base_executor import BaseExecutor
43
44
45# Used to lookup an ExecutorName via a string alias or module path. An
46# executor may have both so we need two lookup dicts.
47_alias_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict)
48_module_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict)
49_classname_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict)
50# Used to lookup an ExecutorName via the team id.
51_team_name_to_executors: dict[str | None, list[ExecutorName]] = defaultdict(list)
52# Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once
53_executor_names: list[ExecutorName] = []
54
55
56class ExecutorLoader:
57 """Keeps constants for all the currently available executors."""
58
59 executors = {
60 LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
61 CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor",
62 KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
63 "executors.kubernetes_executor.KubernetesExecutor",
64 }
65
66 @classmethod
67 def _get_executor_names(cls, validate_teams: bool = True) -> list[ExecutorName]:
68 """
69 Return the executor names from Airflow configuration.
70
71 :param validate_teams: Whether to validate that team names exist in database
72 :return: List of executor names from Airflow configuration
73 """
74 if _executor_names:
75 return _executor_names
76
77 all_executor_names: list[tuple[str | None, list[str]]] = cls._get_team_executor_configs(
78 validate_teams=validate_teams
79 )
80
81 executor_names: list[ExecutorName] = []
82 for team_name, executor_names_config in all_executor_names:
83 executor_names_per_team = []
84 for executor_name_str in executor_names_config:
85 if len(split_name := executor_name_str.split(":")) == 1:
86 name = split_name[0]
87 # Check if this is an alias for a core airflow executor, module
88 # paths won't be provided by the user in that case.
89 if core_executor_module := cls.executors.get(name):
90 executor_names_per_team.append(
91 ExecutorName(module_path=core_executor_module, alias=name, team_name=team_name)
92 )
93 # A module path was provided
94 else:
95 executor_names_per_team.append(
96 ExecutorName(alias=None, module_path=name, team_name=team_name)
97 )
98 # An alias was provided with the module path
99 elif len(split_name) == 2:
100 # Ensure the user is not trying to override the existing aliases of any of the core
101 # executors by providing an alias along with the existing core airflow executor alias
102 # (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily
103 # complicated. Multiple Executors of the same type will be supported by a future
104 # multitenancy AIP.
105 # The module component should always be a module path.
106 module_path = split_name[1]
107 if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path:
108 raise AirflowConfigException(
109 "Incorrectly formatted executor configuration. Second portion of an executor "
110 f"configuration must be a module path but received: {module_path}"
111 )
112 executor_names_per_team.append(
113 ExecutorName(alias=split_name[0], module_path=split_name[1], team_name=team_name)
114 )
115 else:
116 raise AirflowConfigException(
117 f"Incorrectly formatted executor configuration: {executor_name_str}"
118 )
119
120 # As of now, we do not allow duplicate executors (within teams).
121 # Add all module paths to a set, since the actual code is what is unique
122 unique_modules = set([exec_name.module_path for exec_name in executor_names_per_team])
123 if len(unique_modules) < len(executor_names_per_team):
124 msg = (
125 "At least one executor was configured twice. Duplicate executors are not yet supported.\n"
126 "Please check your configuration again to correct the issue."
127 )
128 raise AirflowConfigException(msg)
129
130 executor_names.extend(executor_names_per_team)
131
132 # Populate some mappings for fast future lookups
133 for executor_name in executor_names:
134 # Executors will not always have aliases
135 if executor_name.alias:
136 _alias_to_executors_per_team[executor_name.team_name][executor_name.alias] = executor_name
137 # All executors will have a team name. It _may_ be None, for now that means it is a system level executor
138 _team_name_to_executors[executor_name.team_name].append(executor_name)
139 # All executors will have a module path
140 _module_to_executors_per_team[executor_name.team_name][executor_name.module_path] = executor_name
141 _classname_to_executors_per_team[executor_name.team_name][
142 executor_name.module_path.split(".")[-1]
143 ] = executor_name
144 # Cache the executor names, so the logic of this method only runs once
145 _executor_names.append(executor_name)
146
147 return executor_names
148
149 @classmethod
150 def block_use_of_multi_team(cls):
151 """
152 Raise an exception if the user tries to use multiple team based executors.
153
154 Before the feature is complete we do not want users to accidentally configure this.
155 This can be overridden by setting the AIRFLOW__DEV__MULTI_TEAM_MODE environment
156 variable to "enabled"
157 This check is built into a method so that it can be easily mocked in unit tests.
158 """
159 team_dev_mode: str | None = os.environ.get("AIRFLOW__DEV__MULTI_TEAM_MODE")
160 if not team_dev_mode or team_dev_mode != "enabled":
161 raise AirflowConfigException("Configuring multiple team based executors is not yet supported!")
162
163 @classmethod
164 def _validate_teams_exist_in_database(cls, team_names: set[str]) -> None:
165 """
166 Validate that all specified team names exist in the database.
167
168 :param team_names: Set of team names to validate
169 :raises AirflowConfigException: If any team names don't exist in the database
170 """
171 if not team_names:
172 return
173
174 existing_teams = Team.get_all_team_names()
175
176 missing_teams = team_names - existing_teams
177
178 if missing_teams:
179 missing_teams_list = sorted(missing_teams)
180 missing_teams_str = ", ".join(missing_teams_list)
181
182 raise AirflowConfigException(
183 f"One or more teams specified in executor configuration do not exist in database: {missing_teams_str}. "
184 "Please create these teams first or remove them from executor configuration."
185 )
186
187 @classmethod
188 def _get_team_executor_configs(cls, validate_teams: bool = True) -> list[tuple[str | None, list[str]]]:
189 """
190 Return a list of executor configs to be loaded.
191
192 Each tuple contains the team id as the first element and the second element is the executor config
193 for that team (a list of executor names/modules/aliases).
194
195 :param validate_teams: Whether to validate that team names exist in database
196 """
197 from airflow.configuration import conf
198
199 executor_config = conf.get_mandatory_value("core", "executor")
200 if not executor_config:
201 raise AirflowConfigException(
202 "The 'executor' key in the 'core' section of the configuration is mandatory and cannot be empty"
203 )
204 configs: list[tuple[str | None, list[str]]] = []
205 seen_teams: set[str | None] = set()
206
207 # The executor_config can look like a few things. One is just a single executor name, such as
208 # "CeleryExecutor". Or a list of executors, such as "CeleryExecutor,KubernetesExecutor,module.path.to.executor".
209 # In these cases these are all executors that are available to all teams, with the first one being the
210 # default executor, as usual. The config can also look like a list of executors, per team, with the team name
211 # prefixing each list of executors separated by a equal sign and then each team list separated by a
212 # semi-colon.
213 # "LocalExecutor;team1=CeleryExecutor;team2=KubernetesExecutor,module.path.to.executor".
214 for team_executor_config in executor_config.split(";"):
215 # The first item in the list may not have a team id (either empty string before the equal
216 # sign or no equal sign at all), which means it is a global executor config.
217 if "=" not in team_executor_config or team_executor_config.startswith("="):
218 team_name = None
219 executor_names = team_executor_config.strip("=")
220 else:
221 cls.block_use_of_multi_team()
222 if conf.getboolean("core", "multi_team", fallback=False):
223 team_name, executor_names = team_executor_config.split("=")
224 else:
225 log.warning(
226 "The 'multi_team' config is not enabled, but team executors were configured. "
227 "The following team executor config will be ignored: %s",
228 team_executor_config,
229 )
230 continue
231
232 # Check for duplicate team names
233 if team_name in seen_teams:
234 raise AirflowConfigException(
235 f"Team '{team_name}' appears more than once in executor configuration. "
236 f"Each team can only be specified once in the executor config."
237 )
238 seen_teams.add(team_name)
239
240 # Split by comma to get the individual executor names and strip spaces off of them
241 configs.append((team_name, [name.strip() for name in executor_names.split(",")]))
242
243 # Validate that at least one global executor exists
244 has_global_executor = any(team_name is None for team_name, _ in configs)
245 if not has_global_executor:
246 raise AirflowConfigException(
247 "At least one global executor must be configured. Current configuration only contains "
248 "team-based executors. Please add a global executor configuration (e.g., "
249 "'CeleryExecutor;team1=LocalExecutor' instead of 'team1=CeleryExecutor;team2=LocalExecutor')."
250 )
251
252 # Validate that global executors come before team executors
253 seen_team_executor = False
254 for team_name, _ in configs:
255 if team_name is not None:
256 seen_team_executor = True
257 elif seen_team_executor:
258 # Found a global executor after we've already seen a team executor
259 raise AirflowConfigException(
260 "Global executors must be specified before team-based executors. "
261 "Current configuration has team executors before global executors. "
262 "Please reorder your configuration so that all global executors (those without a team prefix) "
263 "appear before any team-based executors (e.g., 'CeleryExecutor;team1=LocalExecutor' "
264 "instead of 'team1=CeleryExecutor;LocalExecutor')."
265 )
266
267 # Validate that all team names exist in the database (excluding None for global configs)
268 team_names_to_validate = {team_name for team_name in seen_teams if team_name is not None}
269 if team_names_to_validate and validate_teams:
270 cls._validate_teams_exist_in_database(team_names_to_validate)
271
272 return configs
273
274 @classmethod
275 def get_executor_names(cls, validate_teams: bool = True) -> list[ExecutorName]:
276 """
277 Return the executor names from Airflow configuration.
278
279 :param validate_teams: Whether to validate that team names exist in database
280 :return: List of executor names from Airflow configuration
281 """
282 return cls._get_executor_names(validate_teams=validate_teams)
283
284 @classmethod
285 def get_default_executor_name(cls, team_name: str | None = None) -> ExecutorName:
286 """
287 Return the default executor name from Airflow configuration.
288
289 :return: executor name from Airflow configuration
290 """
291 cls._get_executor_names()
292 # The default executor is the first configured executor in the list
293 return _team_name_to_executors[team_name][0]
294
295 @classmethod
296 def get_default_executor(cls) -> BaseExecutor:
297 """Create a new instance of the configured executor if none exists and returns it."""
298 default_executor = cls.load_executor(cls.get_default_executor_name())
299
300 return default_executor
301
302 @classmethod
303 def init_executors(cls) -> list[BaseExecutor]:
304 """Create a new instance of all configured executors if not cached already."""
305 executor_names = cls._get_executor_names()
306 loaded_executors = []
307 for executor_name in executor_names:
308 loaded_executor = cls.load_executor(executor_name)
309 if executor_name.alias:
310 cls.executors[executor_name.alias] = executor_name.module_path
311 else:
312 cls.executors[loaded_executor.__class__.__name__] = executor_name.module_path
313
314 loaded_executors.append(loaded_executor)
315
316 return loaded_executors
317
318 @classmethod
319 def lookup_executor_name_by_str(
320 cls, executor_name_str: str, team_name: str | None = None
321 ) -> ExecutorName:
322 # lookup the executor by alias first, if not check if we're given a module path
323 if (
324 not _classname_to_executors_per_team
325 or not _module_to_executors_per_team
326 or not _alias_to_executors_per_team
327 ):
328 # if we haven't loaded the executors yet, such as directly calling load_executor
329 cls._get_executor_names()
330
331 if executor_name := _alias_to_executors_per_team.get(team_name, {}).get(executor_name_str):
332 return executor_name
333 if executor_name := _module_to_executors_per_team.get(team_name, {}).get(executor_name_str):
334 return executor_name
335 if executor_name := _classname_to_executors_per_team.get(team_name, {}).get(executor_name_str):
336 return executor_name
337 raise UnknownExecutorException(f"Unknown executor being loaded: {executor_name_str}")
338
339 @classmethod
340 def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor:
341 """
342 Load the executor.
343
344 This supports the following formats:
345 * by executor name for core executor
346 * by import path
347 * by class name of the Executor
348 * by ExecutorName object specification
349
350 :return: an instance of executor class via executor_name
351 """
352 if not executor_name:
353 _executor_name = cls.get_default_executor_name()
354 elif isinstance(executor_name, str):
355 _executor_name = cls.lookup_executor_name_by_str(executor_name)
356 else:
357 _executor_name = executor_name
358
359 try:
360 executor_cls, import_source = cls.import_executor_cls(_executor_name)
361 log.debug("Loading executor %s from %s", _executor_name, import_source.value)
362 if _executor_name.team_name:
363 # Validate that team executors support multi-team functionality
364 if not executor_cls.supports_multi_team:
365 raise AirflowConfigException(
366 f"Executor {_executor_name.module_path} does not support multi-team functionality "
367 f"but was configured for team '{_executor_name.team_name}'. "
368 f"Only executors with supports_multi_team=True can be used as team executors."
369 )
370 executor = executor_cls(team_name=_executor_name.team_name)
371 else:
372 executor = executor_cls()
373
374 except ImportError as e:
375 log.error(e)
376 raise AirflowConfigException(
377 f'The module/attribute could not be loaded. Please check "executor" key in "core" section. '
378 f'Current value: "{_executor_name}".'
379 )
380 log.info("Loaded executor: %s", _executor_name)
381
382 # Store the executor name we've built for this executor in the
383 # instance. This makes it easier for the Scheduler, Backfill, etc to
384 # know how we refer to this executor.
385 executor.name = _executor_name
386
387 return executor
388
389 @classmethod
390 def import_executor_cls(cls, executor_name: ExecutorName) -> tuple[type[BaseExecutor], ConnectorSource]:
391 """
392 Import the executor class.
393
394 Supports the same formats as ExecutorLoader.load_executor.
395
396 :param executor_name: Name of core executor or module path to executor.
397
398 :return: executor class via executor_name and executor import source
399 """
400 return import_string(executor_name.module_path), executor_name.connector_source
401
402 @classmethod
403 def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]:
404 """
405 Import the default executor class.
406
407 :return: executor class and executor import source
408 """
409 executor_name = cls.get_default_executor_name()
410 executor, source = cls.import_executor_cls(executor_name)
411 return executor, source