Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/executors/executor_loader.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

166 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""All executors.""" 

18 

19from __future__ import annotations 

20 

21import os 

22from collections import defaultdict 

23from typing import TYPE_CHECKING 

24 

25import structlog 

26 

27from airflow._shared.module_loading import import_string 

28from airflow.exceptions import AirflowConfigException, UnknownExecutorException 

29from airflow.executors.executor_constants import ( 

30 CELERY_EXECUTOR, 

31 CORE_EXECUTOR_NAMES, 

32 KUBERNETES_EXECUTOR, 

33 LOCAL_EXECUTOR, 

34 ConnectorSource, 

35) 

36from airflow.executors.executor_utils import ExecutorName 

37from airflow.models.team import Team 

38 

39log = structlog.get_logger(__name__) 

40 

41if TYPE_CHECKING: 

42 from airflow.executors.base_executor import BaseExecutor 

43 

44 

45# Used to lookup an ExecutorName via a string alias or module path. An 

46# executor may have both so we need two lookup dicts. 

47_alias_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict) 

48_module_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict) 

49_classname_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict) 

50# Used to lookup an ExecutorName via the team id. 

51_team_name_to_executors: dict[str | None, list[ExecutorName]] = defaultdict(list) 

52# Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once 

53_executor_names: list[ExecutorName] = [] 

54 

55 

56class ExecutorLoader: 

57 """Keeps constants for all the currently available executors.""" 

58 

59 executors = { 

60 LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor", 

61 CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor", 

62 KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes." 

63 "executors.kubernetes_executor.KubernetesExecutor", 

64 } 

65 

66 @classmethod 

67 def _get_executor_names(cls, validate_teams: bool = True) -> list[ExecutorName]: 

68 """ 

69 Return the executor names from Airflow configuration. 

70 

71 :param validate_teams: Whether to validate that team names exist in database 

72 :return: List of executor names from Airflow configuration 

73 """ 

74 if _executor_names: 

75 return _executor_names 

76 

77 all_executor_names: list[tuple[str | None, list[str]]] = cls._get_team_executor_configs( 

78 validate_teams=validate_teams 

79 ) 

80 

81 executor_names: list[ExecutorName] = [] 

82 for team_name, executor_names_config in all_executor_names: 

83 executor_names_per_team = [] 

84 for executor_name_str in executor_names_config: 

85 if len(split_name := executor_name_str.split(":")) == 1: 

86 name = split_name[0] 

87 # Check if this is an alias for a core airflow executor, module 

88 # paths won't be provided by the user in that case. 

89 if core_executor_module := cls.executors.get(name): 

90 executor_names_per_team.append( 

91 ExecutorName(module_path=core_executor_module, alias=name, team_name=team_name) 

92 ) 

93 # A module path was provided 

94 else: 

95 executor_names_per_team.append( 

96 ExecutorName(alias=None, module_path=name, team_name=team_name) 

97 ) 

98 # An alias was provided with the module path 

99 elif len(split_name) == 2: 

100 # Ensure the user is not trying to override the existing aliases of any of the core 

101 # executors by providing an alias along with the existing core airflow executor alias 

102 # (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily 

103 # complicated. Multiple Executors of the same type will be supported by a future 

104 # multitenancy AIP. 

105 # The module component should always be a module path. 

106 module_path = split_name[1] 

107 if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path: 

108 raise AirflowConfigException( 

109 "Incorrectly formatted executor configuration. Second portion of an executor " 

110 f"configuration must be a module path but received: {module_path}" 

111 ) 

112 executor_names_per_team.append( 

113 ExecutorName(alias=split_name[0], module_path=split_name[1], team_name=team_name) 

114 ) 

115 else: 

116 raise AirflowConfigException( 

117 f"Incorrectly formatted executor configuration: {executor_name_str}" 

118 ) 

119 

120 # As of now, we do not allow duplicate executors (within teams). 

121 # Add all module paths to a set, since the actual code is what is unique 

122 unique_modules = set([exec_name.module_path for exec_name in executor_names_per_team]) 

123 if len(unique_modules) < len(executor_names_per_team): 

124 msg = ( 

125 "At least one executor was configured twice. Duplicate executors are not yet supported.\n" 

126 "Please check your configuration again to correct the issue." 

127 ) 

128 raise AirflowConfigException(msg) 

129 

130 executor_names.extend(executor_names_per_team) 

131 

132 # Populate some mappings for fast future lookups 

133 for executor_name in executor_names: 

134 # Executors will not always have aliases 

135 if executor_name.alias: 

136 _alias_to_executors_per_team[executor_name.team_name][executor_name.alias] = executor_name 

137 # All executors will have a team name. It _may_ be None, for now that means it is a system level executor 

138 _team_name_to_executors[executor_name.team_name].append(executor_name) 

139 # All executors will have a module path 

140 _module_to_executors_per_team[executor_name.team_name][executor_name.module_path] = executor_name 

141 _classname_to_executors_per_team[executor_name.team_name][ 

142 executor_name.module_path.split(".")[-1] 

143 ] = executor_name 

144 # Cache the executor names, so the logic of this method only runs once 

145 _executor_names.append(executor_name) 

146 

147 return executor_names 

148 

149 @classmethod 

150 def block_use_of_multi_team(cls): 

151 """ 

152 Raise an exception if the user tries to use multiple team based executors. 

153 

154 Before the feature is complete we do not want users to accidentally configure this. 

155 This can be overridden by setting the AIRFLOW__DEV__MULTI_TEAM_MODE environment 

156 variable to "enabled" 

157 This check is built into a method so that it can be easily mocked in unit tests. 

158 """ 

159 team_dev_mode: str | None = os.environ.get("AIRFLOW__DEV__MULTI_TEAM_MODE") 

160 if not team_dev_mode or team_dev_mode != "enabled": 

161 raise AirflowConfigException("Configuring multiple team based executors is not yet supported!") 

162 

163 @classmethod 

164 def _validate_teams_exist_in_database(cls, team_names: set[str]) -> None: 

165 """ 

166 Validate that all specified team names exist in the database. 

167 

168 :param team_names: Set of team names to validate 

169 :raises AirflowConfigException: If any team names don't exist in the database 

170 """ 

171 if not team_names: 

172 return 

173 

174 existing_teams = Team.get_all_team_names() 

175 

176 missing_teams = team_names - existing_teams 

177 

178 if missing_teams: 

179 missing_teams_list = sorted(missing_teams) 

180 missing_teams_str = ", ".join(missing_teams_list) 

181 

182 raise AirflowConfigException( 

183 f"One or more teams specified in executor configuration do not exist in database: {missing_teams_str}. " 

184 "Please create these teams first or remove them from executor configuration." 

185 ) 

186 

187 @classmethod 

188 def _get_team_executor_configs(cls, validate_teams: bool = True) -> list[tuple[str | None, list[str]]]: 

189 """ 

190 Return a list of executor configs to be loaded. 

191 

192 Each tuple contains the team id as the first element and the second element is the executor config 

193 for that team (a list of executor names/modules/aliases). 

194 

195 :param validate_teams: Whether to validate that team names exist in database 

196 """ 

197 from airflow.configuration import conf 

198 

199 executor_config = conf.get_mandatory_value("core", "executor") 

200 if not executor_config: 

201 raise AirflowConfigException( 

202 "The 'executor' key in the 'core' section of the configuration is mandatory and cannot be empty" 

203 ) 

204 configs: list[tuple[str | None, list[str]]] = [] 

205 seen_teams: set[str | None] = set() 

206 

207 # The executor_config can look like a few things. One is just a single executor name, such as 

208 # "CeleryExecutor". Or a list of executors, such as "CeleryExecutor,KubernetesExecutor,module.path.to.executor". 

209 # In these cases these are all executors that are available to all teams, with the first one being the 

210 # default executor, as usual. The config can also look like a list of executors, per team, with the team name 

211 # prefixing each list of executors separated by a equal sign and then each team list separated by a 

212 # semi-colon. 

213 # "LocalExecutor;team1=CeleryExecutor;team2=KubernetesExecutor,module.path.to.executor". 

214 for team_executor_config in executor_config.split(";"): 

215 # The first item in the list may not have a team id (either empty string before the equal 

216 # sign or no equal sign at all), which means it is a global executor config. 

217 if "=" not in team_executor_config or team_executor_config.startswith("="): 

218 team_name = None 

219 executor_names = team_executor_config.strip("=") 

220 else: 

221 cls.block_use_of_multi_team() 

222 if conf.getboolean("core", "multi_team", fallback=False): 

223 team_name, executor_names = team_executor_config.split("=") 

224 else: 

225 log.warning( 

226 "The 'multi_team' config is not enabled, but team executors were configured. " 

227 "The following team executor config will be ignored: %s", 

228 team_executor_config, 

229 ) 

230 continue 

231 

232 # Check for duplicate team names 

233 if team_name in seen_teams: 

234 raise AirflowConfigException( 

235 f"Team '{team_name}' appears more than once in executor configuration. " 

236 f"Each team can only be specified once in the executor config." 

237 ) 

238 seen_teams.add(team_name) 

239 

240 # Split by comma to get the individual executor names and strip spaces off of them 

241 configs.append((team_name, [name.strip() for name in executor_names.split(",")])) 

242 

243 # Validate that at least one global executor exists 

244 has_global_executor = any(team_name is None for team_name, _ in configs) 

245 if not has_global_executor: 

246 raise AirflowConfigException( 

247 "At least one global executor must be configured. Current configuration only contains " 

248 "team-based executors. Please add a global executor configuration (e.g., " 

249 "'CeleryExecutor;team1=LocalExecutor' instead of 'team1=CeleryExecutor;team2=LocalExecutor')." 

250 ) 

251 

252 # Validate that global executors come before team executors 

253 seen_team_executor = False 

254 for team_name, _ in configs: 

255 if team_name is not None: 

256 seen_team_executor = True 

257 elif seen_team_executor: 

258 # Found a global executor after we've already seen a team executor 

259 raise AirflowConfigException( 

260 "Global executors must be specified before team-based executors. " 

261 "Current configuration has team executors before global executors. " 

262 "Please reorder your configuration so that all global executors (those without a team prefix) " 

263 "appear before any team-based executors (e.g., 'CeleryExecutor;team1=LocalExecutor' " 

264 "instead of 'team1=CeleryExecutor;LocalExecutor')." 

265 ) 

266 

267 # Validate that all team names exist in the database (excluding None for global configs) 

268 team_names_to_validate = {team_name for team_name in seen_teams if team_name is not None} 

269 if team_names_to_validate and validate_teams: 

270 cls._validate_teams_exist_in_database(team_names_to_validate) 

271 

272 return configs 

273 

274 @classmethod 

275 def get_executor_names(cls, validate_teams: bool = True) -> list[ExecutorName]: 

276 """ 

277 Return the executor names from Airflow configuration. 

278 

279 :param validate_teams: Whether to validate that team names exist in database 

280 :return: List of executor names from Airflow configuration 

281 """ 

282 return cls._get_executor_names(validate_teams=validate_teams) 

283 

284 @classmethod 

285 def get_default_executor_name(cls, team_name: str | None = None) -> ExecutorName: 

286 """ 

287 Return the default executor name from Airflow configuration. 

288 

289 :return: executor name from Airflow configuration 

290 """ 

291 cls._get_executor_names() 

292 # The default executor is the first configured executor in the list 

293 return _team_name_to_executors[team_name][0] 

294 

295 @classmethod 

296 def get_default_executor(cls) -> BaseExecutor: 

297 """Create a new instance of the configured executor if none exists and returns it.""" 

298 default_executor = cls.load_executor(cls.get_default_executor_name()) 

299 

300 return default_executor 

301 

302 @classmethod 

303 def init_executors(cls) -> list[BaseExecutor]: 

304 """Create a new instance of all configured executors if not cached already.""" 

305 executor_names = cls._get_executor_names() 

306 loaded_executors = [] 

307 for executor_name in executor_names: 

308 loaded_executor = cls.load_executor(executor_name) 

309 if executor_name.alias: 

310 cls.executors[executor_name.alias] = executor_name.module_path 

311 else: 

312 cls.executors[loaded_executor.__class__.__name__] = executor_name.module_path 

313 

314 loaded_executors.append(loaded_executor) 

315 

316 return loaded_executors 

317 

318 @classmethod 

319 def lookup_executor_name_by_str( 

320 cls, executor_name_str: str, team_name: str | None = None 

321 ) -> ExecutorName: 

322 # lookup the executor by alias first, if not check if we're given a module path 

323 if ( 

324 not _classname_to_executors_per_team 

325 or not _module_to_executors_per_team 

326 or not _alias_to_executors_per_team 

327 ): 

328 # if we haven't loaded the executors yet, such as directly calling load_executor 

329 cls._get_executor_names() 

330 

331 if executor_name := _alias_to_executors_per_team.get(team_name, {}).get(executor_name_str): 

332 return executor_name 

333 if executor_name := _module_to_executors_per_team.get(team_name, {}).get(executor_name_str): 

334 return executor_name 

335 if executor_name := _classname_to_executors_per_team.get(team_name, {}).get(executor_name_str): 

336 return executor_name 

337 raise UnknownExecutorException(f"Unknown executor being loaded: {executor_name_str}") 

338 

339 @classmethod 

340 def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor: 

341 """ 

342 Load the executor. 

343 

344 This supports the following formats: 

345 * by executor name for core executor 

346 * by import path 

347 * by class name of the Executor 

348 * by ExecutorName object specification 

349 

350 :return: an instance of executor class via executor_name 

351 """ 

352 if not executor_name: 

353 _executor_name = cls.get_default_executor_name() 

354 elif isinstance(executor_name, str): 

355 _executor_name = cls.lookup_executor_name_by_str(executor_name) 

356 else: 

357 _executor_name = executor_name 

358 

359 try: 

360 executor_cls, import_source = cls.import_executor_cls(_executor_name) 

361 log.debug("Loading executor %s from %s", _executor_name, import_source.value) 

362 if _executor_name.team_name: 

363 # Validate that team executors support multi-team functionality 

364 if not executor_cls.supports_multi_team: 

365 raise AirflowConfigException( 

366 f"Executor {_executor_name.module_path} does not support multi-team functionality " 

367 f"but was configured for team '{_executor_name.team_name}'. " 

368 f"Only executors with supports_multi_team=True can be used as team executors." 

369 ) 

370 executor = executor_cls(team_name=_executor_name.team_name) 

371 else: 

372 executor = executor_cls() 

373 

374 except ImportError as e: 

375 log.error(e) 

376 raise AirflowConfigException( 

377 f'The module/attribute could not be loaded. Please check "executor" key in "core" section. ' 

378 f'Current value: "{_executor_name}".' 

379 ) 

380 log.info("Loaded executor: %s", _executor_name) 

381 

382 # Store the executor name we've built for this executor in the 

383 # instance. This makes it easier for the Scheduler, Backfill, etc to 

384 # know how we refer to this executor. 

385 executor.name = _executor_name 

386 

387 return executor 

388 

389 @classmethod 

390 def import_executor_cls(cls, executor_name: ExecutorName) -> tuple[type[BaseExecutor], ConnectorSource]: 

391 """ 

392 Import the executor class. 

393 

394 Supports the same formats as ExecutorLoader.load_executor. 

395 

396 :param executor_name: Name of core executor or module path to executor. 

397 

398 :return: executor class via executor_name and executor import source 

399 """ 

400 return import_string(executor_name.module_path), executor_name.connector_source 

401 

402 @classmethod 

403 def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]: 

404 """ 

405 Import the default executor class. 

406 

407 :return: executor class and executor import source 

408 """ 

409 executor_name = cls.get_default_executor_name() 

410 executor, source = cls.import_executor_cls(executor_name) 

411 return executor, source