1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""All executors."""
18
19from __future__ import annotations
20
21import os
22from collections import defaultdict
23from typing import TYPE_CHECKING
24
25import structlog
26
27from airflow._shared.module_loading import import_string
28from airflow.exceptions import AirflowConfigException, UnknownExecutorException
29from airflow.executors.executor_constants import (
30 CELERY_EXECUTOR,
31 CORE_EXECUTOR_NAMES,
32 KUBERNETES_EXECUTOR,
33 LOCAL_EXECUTOR,
34 ConnectorSource,
35)
36from airflow.executors.executor_utils import ExecutorName
37from airflow.models.team import Team
38
39log = structlog.get_logger(__name__)
40
41if TYPE_CHECKING:
42 from airflow.executors.base_executor import BaseExecutor
43
44
45# Used to lookup an ExecutorName via a string alias or module path. An
46# executor may have both so we need two lookup dicts.
47_alias_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict)
48_module_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict)
49_classname_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict)
50# Used to lookup an ExecutorName via the team id.
51_team_name_to_executors: dict[str | None, list[ExecutorName]] = defaultdict(list)
52# Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once
53_executor_names: list[ExecutorName] = []
54
55
56class ExecutorLoader:
57 """Keeps constants for all the currently available executors."""
58
59 executors = {
60 LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
61 CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor",
62 KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
63 "executors.kubernetes_executor.KubernetesExecutor",
64 }
65
66 @classmethod
67 def _get_executor_names(cls, validate_teams: bool = True) -> list[ExecutorName]:
68 """
69 Return the executor names from Airflow configuration.
70
71 :param validate_teams: Whether to validate that team names exist in database
72 :return: List of executor names from Airflow configuration
73 """
74 if _executor_names:
75 return _executor_names
76
77 all_executor_names: list[tuple[str | None, list[str]]] = cls._get_team_executor_configs(
78 validate_teams=validate_teams
79 )
80
81 executor_names: list[ExecutorName] = []
82 for team_name, executor_names_config in all_executor_names:
83 executor_names_per_team = []
84 for executor_name_str in executor_names_config:
85 if len(split_name := executor_name_str.split(":")) == 1:
86 name = split_name[0]
87 # Check if this is an alias for a core airflow executor, module
88 # paths won't be provided by the user in that case.
89 if core_executor_module := cls.executors.get(name):
90 executor_names_per_team.append(
91 ExecutorName(module_path=core_executor_module, alias=name, team_name=team_name)
92 )
93 # A module path was provided
94 else:
95 executor_names_per_team.append(
96 ExecutorName(alias=None, module_path=name, team_name=team_name)
97 )
98 # An alias was provided with the module path
99 elif len(split_name) == 2:
100 # Ensure the user is not trying to override the existing aliases of any of the core
101 # executors by providing an alias along with the existing core airflow executor alias
102 # (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily
103 # complicated. Multiple Executors of the same type will be supported by a future
104 # multitenancy AIP.
105 # The module component should always be a module path.
106 module_path = split_name[1]
107 if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path:
108 raise AirflowConfigException(
109 "Incorrectly formatted executor configuration. Second portion of an executor "
110 f"configuration must be a module path but received: {module_path}"
111 )
112 executor_names_per_team.append(
113 ExecutorName(alias=split_name[0], module_path=split_name[1], team_name=team_name)
114 )
115 else:
116 raise AirflowConfigException(
117 f"Incorrectly formatted executor configuration: {executor_name_str}"
118 )
119
120 # As of now, we do not allow duplicate executors (within teams).
121 # Add all module paths to a set, since the actual code is what is unique
122 unique_modules = set([exec_name.module_path for exec_name in executor_names_per_team])
123 if len(unique_modules) < len(executor_names_per_team):
124 msg = (
125 "At least one executor was configured twice. Duplicate executors are not yet supported.\n"
126 "Please check your configuration again to correct the issue."
127 )
128 raise AirflowConfigException(msg)
129
130 executor_names.extend(executor_names_per_team)
131
132 # Populate some mappings for fast future lookups
133 for executor_name in executor_names:
134 # Executors will not always have aliases
135 if executor_name.alias:
136 _alias_to_executors_per_team[executor_name.team_name][executor_name.alias] = executor_name
137 # All executors will have a team name. It _may_ be None, for now that means it is a system level executor
138 _team_name_to_executors[executor_name.team_name].append(executor_name)
139 # All executors will have a module path
140 _module_to_executors_per_team[executor_name.team_name][executor_name.module_path] = executor_name
141 _classname_to_executors_per_team[executor_name.team_name][
142 executor_name.module_path.split(".")[-1]
143 ] = executor_name
144 # Cache the executor names, so the logic of this method only runs once
145 _executor_names.append(executor_name)
146
147 return executor_names
148
149 @classmethod
150 def block_use_of_multi_team(cls):
151 """
152 Raise an exception if the user tries to use multiple team based executors.
153
154 Before the feature is complete we do not want users to accidentally configure this.
155 This can be overridden by setting the AIRFLOW__DEV__MULTI_TEAM_MODE environment
156 variable to "enabled"
157 This check is built into a method so that it can be easily mocked in unit tests.
158 """
159 team_dev_mode: str | None = os.environ.get("AIRFLOW__DEV__MULTI_TEAM_MODE")
160 if not team_dev_mode or team_dev_mode != "enabled":
161 raise AirflowConfigException("Configuring multiple team based executors is not yet supported!")
162
163 @classmethod
164 def _validate_teams_exist_in_database(cls, team_names: set[str]) -> None:
165 """
166 Validate that all specified team names exist in the database.
167
168 :param team_names: Set of team names to validate
169 :raises AirflowConfigException: If any team names don't exist in the database
170 """
171 if not team_names:
172 return
173
174 existing_teams = Team.get_all_team_names()
175
176 missing_teams = team_names - existing_teams
177
178 if missing_teams:
179 missing_teams_list = sorted(missing_teams)
180 missing_teams_str = ", ".join(missing_teams_list)
181
182 raise AirflowConfigException(
183 f"One or more teams specified in executor configuration do not exist in database: {missing_teams_str}. "
184 "Please create these teams first or remove them from executor configuration."
185 )
186
187 @classmethod
188 def _get_team_executor_configs(cls, validate_teams: bool = True) -> list[tuple[str | None, list[str]]]:
189 """
190 Return a list of executor configs to be loaded.
191
192 Each tuple contains the team id as the first element and the second element is the executor config
193 for that team (a list of executor names/modules/aliases).
194
195 :param validate_teams: Whether to validate that team names exist in database
196 """
197 from airflow.configuration import conf
198
199 executor_config = conf.get_mandatory_value("core", "executor")
200 if not executor_config:
201 raise AirflowConfigException(
202 "The 'executor' key in the 'core' section of the configuration is mandatory and cannot be empty"
203 )
204 configs: list[tuple[str | None, list[str]]] = []
205 seen_teams: set[str | None] = set()
206
207 # The executor_config can look like a few things. One is just a single executor name, such as
208 # "CeleryExecutor". Or a list of executors, such as "CeleryExecutor,KubernetesExecutor,module.path.to.executor".
209 # In these cases these are all executors that are available to all teams, with the first one being the
210 # default executor, as usual. The config can also look like a list of executors, per team, with the team name
211 # prefixing each list of executors separated by a equal sign and then each team list separated by a
212 # semi-colon.
213 # "LocalExecutor;team1=CeleryExecutor;team2=KubernetesExecutor,module.path.to.executor".
214 for team_executor_config in executor_config.split(";"):
215 # The first item in the list may not have a team id (either empty string before the equal
216 # sign or no equal sign at all), which means it is a global executor config.
217 if "=" not in team_executor_config or team_executor_config.startswith("="):
218 team_name = None
219 executor_names = team_executor_config.strip("=")
220 else:
221 cls.block_use_of_multi_team()
222 if conf.getboolean("core", "multi_team", fallback=False):
223 team_name, executor_names = team_executor_config.split("=")
224 else:
225 log.warning(
226 "The 'multi_team' config is not enabled, but team executors were configured. "
227 "The following team executor config will be ignored: %s",
228 team_executor_config,
229 )
230 continue
231
232 # Check for duplicate team names
233 if team_name in seen_teams:
234 raise AirflowConfigException(
235 f"Team '{team_name}' appears more than once in executor configuration. "
236 f"Each team can only be specified once in the executor config."
237 )
238 seen_teams.add(team_name)
239
240 # Split by comma to get the individual executor names and strip spaces off of them
241 configs.append((team_name, [name.strip() for name in executor_names.split(",")]))
242
243 # Validate that at least one global executor exists
244 has_global_executor = any(team_name is None for team_name, _ in configs)
245 if not has_global_executor:
246 raise AirflowConfigException(
247 "At least one global executor must be configured. Current configuration only contains "
248 "team-based executors. Please add a global executor configuration (e.g., "
249 "'CeleryExecutor;team1=LocalExecutor' instead of 'team1=CeleryExecutor;team2=LocalExecutor')."
250 )
251
252 # Validate that all team names exist in the database (excluding None for global configs)
253 team_names_to_validate = {team_name for team_name in seen_teams if team_name is not None}
254 if team_names_to_validate and validate_teams:
255 cls._validate_teams_exist_in_database(team_names_to_validate)
256
257 return configs
258
259 @classmethod
260 def get_executor_names(cls, validate_teams: bool = True) -> list[ExecutorName]:
261 """
262 Return the executor names from Airflow configuration.
263
264 :param validate_teams: Whether to validate that team names exist in database
265 :return: List of executor names from Airflow configuration
266 """
267 return cls._get_executor_names(validate_teams=validate_teams)
268
269 @classmethod
270 def get_default_executor_name(cls, team_name: str | None = None) -> ExecutorName:
271 """
272 Return the default executor name from Airflow configuration.
273
274 :return: executor name from Airflow configuration
275 """
276 cls._get_executor_names()
277 # The default executor is the first configured executor in the list
278 return _team_name_to_executors[team_name][0]
279
280 @classmethod
281 def get_default_executor(cls) -> BaseExecutor:
282 """Create a new instance of the configured executor if none exists and returns it."""
283 default_executor = cls.load_executor(cls.get_default_executor_name())
284
285 return default_executor
286
287 @classmethod
288 def init_executors(cls) -> list[BaseExecutor]:
289 """Create a new instance of all configured executors if not cached already."""
290 executor_names = cls._get_executor_names()
291 loaded_executors = []
292 for executor_name in executor_names:
293 loaded_executor = cls.load_executor(executor_name)
294 if executor_name.alias:
295 cls.executors[executor_name.alias] = executor_name.module_path
296 else:
297 cls.executors[loaded_executor.__class__.__name__] = executor_name.module_path
298
299 loaded_executors.append(loaded_executor)
300
301 return loaded_executors
302
303 @classmethod
304 def lookup_executor_name_by_str(
305 cls, executor_name_str: str, team_name: str | None = None
306 ) -> ExecutorName:
307 # lookup the executor by alias first, if not check if we're given a module path
308 if (
309 not _classname_to_executors_per_team
310 or not _module_to_executors_per_team
311 or not _alias_to_executors_per_team
312 ):
313 # if we haven't loaded the executors yet, such as directly calling load_executor
314 cls._get_executor_names()
315
316 if executor_name := _alias_to_executors_per_team.get(team_name, {}).get(executor_name_str):
317 return executor_name
318 if executor_name := _module_to_executors_per_team.get(team_name, {}).get(executor_name_str):
319 return executor_name
320 if executor_name := _classname_to_executors_per_team.get(team_name, {}).get(executor_name_str):
321 return executor_name
322 raise UnknownExecutorException(f"Unknown executor being loaded: {executor_name_str}")
323
324 @classmethod
325 def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor:
326 """
327 Load the executor.
328
329 This supports the following formats:
330 * by executor name for core executor
331 * by import path
332 * by class name of the Executor
333 * by ExecutorName object specification
334
335 :return: an instance of executor class via executor_name
336 """
337 if not executor_name:
338 _executor_name = cls.get_default_executor_name()
339 elif isinstance(executor_name, str):
340 _executor_name = cls.lookup_executor_name_by_str(executor_name)
341 else:
342 _executor_name = executor_name
343
344 try:
345 executor_cls, import_source = cls.import_executor_cls(_executor_name)
346 log.debug("Loading executor %s from %s", _executor_name, import_source.value)
347 if _executor_name.team_name:
348 executor = executor_cls(team_name=_executor_name.team_name)
349 else:
350 executor = executor_cls()
351
352 except ImportError as e:
353 log.error(e)
354 raise AirflowConfigException(
355 f'The module/attribute could not be loaded. Please check "executor" key in "core" section. '
356 f'Current value: "{_executor_name}".'
357 )
358 log.info("Loaded executor: %s", _executor_name)
359
360 # Store the executor name we've built for this executor in the
361 # instance. This makes it easier for the Scheduler, Backfill, etc to
362 # know how we refer to this executor.
363 executor.name = _executor_name
364
365 return executor
366
367 @classmethod
368 def import_executor_cls(cls, executor_name: ExecutorName) -> tuple[type[BaseExecutor], ConnectorSource]:
369 """
370 Import the executor class.
371
372 Supports the same formats as ExecutorLoader.load_executor.
373
374 :param executor_name: Name of core executor or module path to executor.
375
376 :return: executor class via executor_name and executor import source
377 """
378 return import_string(executor_name.module_path), executor_name.connector_source
379
380 @classmethod
381 def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]:
382 """
383 Import the default executor class.
384
385 :return: executor class and executor import source
386 """
387 executor_name = cls.get_default_executor_name()
388 executor, source = cls.import_executor_cls(executor_name)
389 return executor, source