Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/executors/executor_loader.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

158 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""All executors.""" 

18 

19from __future__ import annotations 

20 

21import os 

22from collections import defaultdict 

23from typing import TYPE_CHECKING 

24 

25import structlog 

26 

27from airflow._shared.module_loading import import_string 

28from airflow.exceptions import AirflowConfigException, UnknownExecutorException 

29from airflow.executors.executor_constants import ( 

30 CELERY_EXECUTOR, 

31 CORE_EXECUTOR_NAMES, 

32 KUBERNETES_EXECUTOR, 

33 LOCAL_EXECUTOR, 

34 ConnectorSource, 

35) 

36from airflow.executors.executor_utils import ExecutorName 

37from airflow.models.team import Team 

38 

39log = structlog.get_logger(__name__) 

40 

41if TYPE_CHECKING: 

42 from airflow.executors.base_executor import BaseExecutor 

43 

44 

45# Used to lookup an ExecutorName via a string alias or module path. An 

46# executor may have both so we need two lookup dicts. 

47_alias_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict) 

48_module_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict) 

49_classname_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] = defaultdict(dict) 

50# Used to lookup an ExecutorName via the team id. 

51_team_name_to_executors: dict[str | None, list[ExecutorName]] = defaultdict(list) 

52# Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once 

53_executor_names: list[ExecutorName] = [] 

54 

55 

56class ExecutorLoader: 

57 """Keeps constants for all the currently available executors.""" 

58 

59 executors = { 

60 LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor", 

61 CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor", 

62 KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes." 

63 "executors.kubernetes_executor.KubernetesExecutor", 

64 } 

65 

66 @classmethod 

67 def _get_executor_names(cls, validate_teams: bool = True) -> list[ExecutorName]: 

68 """ 

69 Return the executor names from Airflow configuration. 

70 

71 :param validate_teams: Whether to validate that team names exist in database 

72 :return: List of executor names from Airflow configuration 

73 """ 

74 if _executor_names: 

75 return _executor_names 

76 

77 all_executor_names: list[tuple[str | None, list[str]]] = cls._get_team_executor_configs( 

78 validate_teams=validate_teams 

79 ) 

80 

81 executor_names: list[ExecutorName] = [] 

82 for team_name, executor_names_config in all_executor_names: 

83 executor_names_per_team = [] 

84 for executor_name_str in executor_names_config: 

85 if len(split_name := executor_name_str.split(":")) == 1: 

86 name = split_name[0] 

87 # Check if this is an alias for a core airflow executor, module 

88 # paths won't be provided by the user in that case. 

89 if core_executor_module := cls.executors.get(name): 

90 executor_names_per_team.append( 

91 ExecutorName(module_path=core_executor_module, alias=name, team_name=team_name) 

92 ) 

93 # A module path was provided 

94 else: 

95 executor_names_per_team.append( 

96 ExecutorName(alias=None, module_path=name, team_name=team_name) 

97 ) 

98 # An alias was provided with the module path 

99 elif len(split_name) == 2: 

100 # Ensure the user is not trying to override the existing aliases of any of the core 

101 # executors by providing an alias along with the existing core airflow executor alias 

102 # (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily 

103 # complicated. Multiple Executors of the same type will be supported by a future 

104 # multitenancy AIP. 

105 # The module component should always be a module path. 

106 module_path = split_name[1] 

107 if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path: 

108 raise AirflowConfigException( 

109 "Incorrectly formatted executor configuration. Second portion of an executor " 

110 f"configuration must be a module path but received: {module_path}" 

111 ) 

112 executor_names_per_team.append( 

113 ExecutorName(alias=split_name[0], module_path=split_name[1], team_name=team_name) 

114 ) 

115 else: 

116 raise AirflowConfigException( 

117 f"Incorrectly formatted executor configuration: {executor_name_str}" 

118 ) 

119 

120 # As of now, we do not allow duplicate executors (within teams). 

121 # Add all module paths to a set, since the actual code is what is unique 

122 unique_modules = set([exec_name.module_path for exec_name in executor_names_per_team]) 

123 if len(unique_modules) < len(executor_names_per_team): 

124 msg = ( 

125 "At least one executor was configured twice. Duplicate executors are not yet supported.\n" 

126 "Please check your configuration again to correct the issue." 

127 ) 

128 raise AirflowConfigException(msg) 

129 

130 executor_names.extend(executor_names_per_team) 

131 

132 # Populate some mappings for fast future lookups 

133 for executor_name in executor_names: 

134 # Executors will not always have aliases 

135 if executor_name.alias: 

136 _alias_to_executors_per_team[executor_name.team_name][executor_name.alias] = executor_name 

137 # All executors will have a team name. It _may_ be None, for now that means it is a system level executor 

138 _team_name_to_executors[executor_name.team_name].append(executor_name) 

139 # All executors will have a module path 

140 _module_to_executors_per_team[executor_name.team_name][executor_name.module_path] = executor_name 

141 _classname_to_executors_per_team[executor_name.team_name][ 

142 executor_name.module_path.split(".")[-1] 

143 ] = executor_name 

144 # Cache the executor names, so the logic of this method only runs once 

145 _executor_names.append(executor_name) 

146 

147 return executor_names 

148 

149 @classmethod 

150 def block_use_of_multi_team(cls): 

151 """ 

152 Raise an exception if the user tries to use multiple team based executors. 

153 

154 Before the feature is complete we do not want users to accidentally configure this. 

155 This can be overridden by setting the AIRFLOW__DEV__MULTI_TEAM_MODE environment 

156 variable to "enabled" 

157 This check is built into a method so that it can be easily mocked in unit tests. 

158 """ 

159 team_dev_mode: str | None = os.environ.get("AIRFLOW__DEV__MULTI_TEAM_MODE") 

160 if not team_dev_mode or team_dev_mode != "enabled": 

161 raise AirflowConfigException("Configuring multiple team based executors is not yet supported!") 

162 

163 @classmethod 

164 def _validate_teams_exist_in_database(cls, team_names: set[str]) -> None: 

165 """ 

166 Validate that all specified team names exist in the database. 

167 

168 :param team_names: Set of team names to validate 

169 :raises AirflowConfigException: If any team names don't exist in the database 

170 """ 

171 if not team_names: 

172 return 

173 

174 existing_teams = Team.get_all_team_names() 

175 

176 missing_teams = team_names - existing_teams 

177 

178 if missing_teams: 

179 missing_teams_list = sorted(missing_teams) 

180 missing_teams_str = ", ".join(missing_teams_list) 

181 

182 raise AirflowConfigException( 

183 f"One or more teams specified in executor configuration do not exist in database: {missing_teams_str}. " 

184 "Please create these teams first or remove them from executor configuration." 

185 ) 

186 

187 @classmethod 

188 def _get_team_executor_configs(cls, validate_teams: bool = True) -> list[tuple[str | None, list[str]]]: 

189 """ 

190 Return a list of executor configs to be loaded. 

191 

192 Each tuple contains the team id as the first element and the second element is the executor config 

193 for that team (a list of executor names/modules/aliases). 

194 

195 :param validate_teams: Whether to validate that team names exist in database 

196 """ 

197 from airflow.configuration import conf 

198 

199 executor_config = conf.get_mandatory_value("core", "executor") 

200 if not executor_config: 

201 raise AirflowConfigException( 

202 "The 'executor' key in the 'core' section of the configuration is mandatory and cannot be empty" 

203 ) 

204 configs: list[tuple[str | None, list[str]]] = [] 

205 seen_teams: set[str | None] = set() 

206 

207 # The executor_config can look like a few things. One is just a single executor name, such as 

208 # "CeleryExecutor". Or a list of executors, such as "CeleryExecutor,KubernetesExecutor,module.path.to.executor". 

209 # In these cases these are all executors that are available to all teams, with the first one being the 

210 # default executor, as usual. The config can also look like a list of executors, per team, with the team name 

211 # prefixing each list of executors separated by a equal sign and then each team list separated by a 

212 # semi-colon. 

213 # "LocalExecutor;team1=CeleryExecutor;team2=KubernetesExecutor,module.path.to.executor". 

214 for team_executor_config in executor_config.split(";"): 

215 # The first item in the list may not have a team id (either empty string before the equal 

216 # sign or no equal sign at all), which means it is a global executor config. 

217 if "=" not in team_executor_config or team_executor_config.startswith("="): 

218 team_name = None 

219 executor_names = team_executor_config.strip("=") 

220 else: 

221 cls.block_use_of_multi_team() 

222 if conf.getboolean("core", "multi_team", fallback=False): 

223 team_name, executor_names = team_executor_config.split("=") 

224 else: 

225 log.warning( 

226 "The 'multi_team' config is not enabled, but team executors were configured. " 

227 "The following team executor config will be ignored: %s", 

228 team_executor_config, 

229 ) 

230 continue 

231 

232 # Check for duplicate team names 

233 if team_name in seen_teams: 

234 raise AirflowConfigException( 

235 f"Team '{team_name}' appears more than once in executor configuration. " 

236 f"Each team can only be specified once in the executor config." 

237 ) 

238 seen_teams.add(team_name) 

239 

240 # Split by comma to get the individual executor names and strip spaces off of them 

241 configs.append((team_name, [name.strip() for name in executor_names.split(",")])) 

242 

243 # Validate that at least one global executor exists 

244 has_global_executor = any(team_name is None for team_name, _ in configs) 

245 if not has_global_executor: 

246 raise AirflowConfigException( 

247 "At least one global executor must be configured. Current configuration only contains " 

248 "team-based executors. Please add a global executor configuration (e.g., " 

249 "'CeleryExecutor;team1=LocalExecutor' instead of 'team1=CeleryExecutor;team2=LocalExecutor')." 

250 ) 

251 

252 # Validate that all team names exist in the database (excluding None for global configs) 

253 team_names_to_validate = {team_name for team_name in seen_teams if team_name is not None} 

254 if team_names_to_validate and validate_teams: 

255 cls._validate_teams_exist_in_database(team_names_to_validate) 

256 

257 return configs 

258 

259 @classmethod 

260 def get_executor_names(cls, validate_teams: bool = True) -> list[ExecutorName]: 

261 """ 

262 Return the executor names from Airflow configuration. 

263 

264 :param validate_teams: Whether to validate that team names exist in database 

265 :return: List of executor names from Airflow configuration 

266 """ 

267 return cls._get_executor_names(validate_teams=validate_teams) 

268 

269 @classmethod 

270 def get_default_executor_name(cls, team_name: str | None = None) -> ExecutorName: 

271 """ 

272 Return the default executor name from Airflow configuration. 

273 

274 :return: executor name from Airflow configuration 

275 """ 

276 cls._get_executor_names() 

277 # The default executor is the first configured executor in the list 

278 return _team_name_to_executors[team_name][0] 

279 

280 @classmethod 

281 def get_default_executor(cls) -> BaseExecutor: 

282 """Create a new instance of the configured executor if none exists and returns it.""" 

283 default_executor = cls.load_executor(cls.get_default_executor_name()) 

284 

285 return default_executor 

286 

287 @classmethod 

288 def init_executors(cls) -> list[BaseExecutor]: 

289 """Create a new instance of all configured executors if not cached already.""" 

290 executor_names = cls._get_executor_names() 

291 loaded_executors = [] 

292 for executor_name in executor_names: 

293 loaded_executor = cls.load_executor(executor_name) 

294 if executor_name.alias: 

295 cls.executors[executor_name.alias] = executor_name.module_path 

296 else: 

297 cls.executors[loaded_executor.__class__.__name__] = executor_name.module_path 

298 

299 loaded_executors.append(loaded_executor) 

300 

301 return loaded_executors 

302 

303 @classmethod 

304 def lookup_executor_name_by_str( 

305 cls, executor_name_str: str, team_name: str | None = None 

306 ) -> ExecutorName: 

307 # lookup the executor by alias first, if not check if we're given a module path 

308 if ( 

309 not _classname_to_executors_per_team 

310 or not _module_to_executors_per_team 

311 or not _alias_to_executors_per_team 

312 ): 

313 # if we haven't loaded the executors yet, such as directly calling load_executor 

314 cls._get_executor_names() 

315 

316 if executor_name := _alias_to_executors_per_team.get(team_name, {}).get(executor_name_str): 

317 return executor_name 

318 if executor_name := _module_to_executors_per_team.get(team_name, {}).get(executor_name_str): 

319 return executor_name 

320 if executor_name := _classname_to_executors_per_team.get(team_name, {}).get(executor_name_str): 

321 return executor_name 

322 raise UnknownExecutorException(f"Unknown executor being loaded: {executor_name_str}") 

323 

324 @classmethod 

325 def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor: 

326 """ 

327 Load the executor. 

328 

329 This supports the following formats: 

330 * by executor name for core executor 

331 * by import path 

332 * by class name of the Executor 

333 * by ExecutorName object specification 

334 

335 :return: an instance of executor class via executor_name 

336 """ 

337 if not executor_name: 

338 _executor_name = cls.get_default_executor_name() 

339 elif isinstance(executor_name, str): 

340 _executor_name = cls.lookup_executor_name_by_str(executor_name) 

341 else: 

342 _executor_name = executor_name 

343 

344 try: 

345 executor_cls, import_source = cls.import_executor_cls(_executor_name) 

346 log.debug("Loading executor %s from %s", _executor_name, import_source.value) 

347 if _executor_name.team_name: 

348 executor = executor_cls(team_name=_executor_name.team_name) 

349 else: 

350 executor = executor_cls() 

351 

352 except ImportError as e: 

353 log.error(e) 

354 raise AirflowConfigException( 

355 f'The module/attribute could not be loaded. Please check "executor" key in "core" section. ' 

356 f'Current value: "{_executor_name}".' 

357 ) 

358 log.info("Loaded executor: %s", _executor_name) 

359 

360 # Store the executor name we've built for this executor in the 

361 # instance. This makes it easier for the Scheduler, Backfill, etc to 

362 # know how we refer to this executor. 

363 executor.name = _executor_name 

364 

365 return executor 

366 

367 @classmethod 

368 def import_executor_cls(cls, executor_name: ExecutorName) -> tuple[type[BaseExecutor], ConnectorSource]: 

369 """ 

370 Import the executor class. 

371 

372 Supports the same formats as ExecutorLoader.load_executor. 

373 

374 :param executor_name: Name of core executor or module path to executor. 

375 

376 :return: executor class via executor_name and executor import source 

377 """ 

378 return import_string(executor_name.module_path), executor_name.connector_source 

379 

380 @classmethod 

381 def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]: 

382 """ 

383 Import the default executor class. 

384 

385 :return: executor class and executor import source 

386 """ 

387 executor_name = cls.get_default_executor_name() 

388 executor, source = cls.import_executor_cls(executor_name) 

389 return executor, source