Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/executors/executor_loader.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

150 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""All executors.""" 

18 

19from __future__ import annotations 

20 

21import functools 

22import logging 

23import os 

24from contextlib import suppress 

25from typing import TYPE_CHECKING 

26 

27from airflow.api_internal.internal_api_call import InternalApiConfig 

28from airflow.exceptions import AirflowConfigException, AirflowException 

29from airflow.executors.executor_constants import ( 

30 CELERY_EXECUTOR, 

31 CELERY_KUBERNETES_EXECUTOR, 

32 CORE_EXECUTOR_NAMES, 

33 DEBUG_EXECUTOR, 

34 KUBERNETES_EXECUTOR, 

35 LOCAL_EXECUTOR, 

36 LOCAL_KUBERNETES_EXECUTOR, 

37 SEQUENTIAL_EXECUTOR, 

38 ConnectorSource, 

39) 

40from airflow.executors.executor_utils import ExecutorName 

41from airflow.utils.module_loading import import_string 

42 

43log = logging.getLogger(__name__) 

44 

45if TYPE_CHECKING: 

46 from airflow.executors.base_executor import BaseExecutor 

47 

48 

49# Used to lookup an ExecutorName via a string alias or module path. An 

50# executor may have both so we need two lookup dicts. 

51_alias_to_executors: dict[str, ExecutorName] = {} 

52_module_to_executors: dict[str, ExecutorName] = {} 

53# Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once 

54_executor_names: list[ExecutorName] = [] 

55# Used to cache executors so that we don't construct executor objects unnecessarily 

56_loaded_executors: dict[ExecutorName, BaseExecutor] = {} 

57 

58 

59class ExecutorLoader: 

60 """Keeps constants for all the currently available executors.""" 

61 

62 executors = { 

63 LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor", 

64 LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes." 

65 "executors.local_kubernetes_executor.LocalKubernetesExecutor", 

66 SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor", 

67 CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor", 

68 CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery." 

69 "executors.celery_kubernetes_executor.CeleryKubernetesExecutor", 

70 KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes." 

71 "executors.kubernetes_executor.KubernetesExecutor", 

72 DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor", 

73 } 

74 

75 @classmethod 

76 def block_use_of_hybrid_exec(cls, executor_config: list): 

77 """Raise an exception if the user tries to use multiple executors before the feature is complete. 

78 

79 This check is built into a method so that it can be easily mocked in unit tests. 

80 

81 :param executor_config: core.executor configuration value. 

82 """ 

83 if len(executor_config) > 1 or ":" in "".join(executor_config): 

84 raise AirflowConfigException( 

85 "Configuring multiple executors and executor aliases are not yet supported!: " 

86 f"{executor_config}" 

87 ) 

88 

89 @classmethod 

90 def _get_executor_names(cls) -> list[ExecutorName]: 

91 """Return the executor names from Airflow configuration. 

92 

93 :return: List of executor names from Airflow configuration 

94 """ 

95 from airflow.configuration import conf 

96 

97 if _executor_names: 

98 return _executor_names 

99 

100 executor_names_raw = conf.get_mandatory_list_value("core", "EXECUTOR") 

101 

102 # AIP-61 is WIP. Unblock configuring multiple executors when the feature is ready to launch 

103 cls.block_use_of_hybrid_exec(executor_names_raw) 

104 

105 executor_names = [] 

106 for name in executor_names_raw: 

107 if len(split_name := name.split(":")) == 1: 

108 name = split_name[0] 

109 # Check if this is an alias for a core airflow executor, module 

110 # paths won't be provided by the user in that case. 

111 if core_executor_module := cls.executors.get(name): 

112 executor_names.append(ExecutorName(alias=name, module_path=core_executor_module)) 

113 # Only a module path or plugin name was provided 

114 else: 

115 executor_names.append(ExecutorName(alias=None, module_path=name)) 

116 # An alias was provided with the module path 

117 elif len(split_name) == 2: 

118 # Ensure the user is not trying to override the existing aliases of any of the core 

119 # executors by providing an alias along with the existing core airflow executor alias 

120 # (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily 

121 # complicated. Multiple Executors of the same type will be supported by a future multitenancy 

122 # AIP. 

123 # The module component should always be a module or plugin path. 

124 module_path = split_name[1] 

125 if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path: 

126 raise AirflowConfigException( 

127 "Incorrectly formatted executor configuration. Second portion of an executor " 

128 f"configuration must be a module path or plugin but received: {module_path}" 

129 ) 

130 else: 

131 executor_names.append(ExecutorName(alias=split_name[0], module_path=split_name[1])) 

132 else: 

133 raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}") 

134 

135 # As of now, we do not allow duplicate executors. 

136 # Add all module paths/plugin names to a set, since the actual code is what is unique 

137 unique_modules = set([exec_name.module_path for exec_name in executor_names]) 

138 if len(unique_modules) < len(executor_names): 

139 msg = ( 

140 "At least one executor was configured twice. Duplicate executors are not yet supported. " 

141 "Please check your configuration again to correct the issue." 

142 ) 

143 raise AirflowConfigException(msg) 

144 

145 # Populate some mappings for fast future lookups 

146 for executor_name in executor_names: 

147 # Executors will not always have aliases 

148 if executor_name.alias: 

149 _alias_to_executors[executor_name.alias] = executor_name 

150 # All executors will have a module path 

151 _module_to_executors[executor_name.module_path] = executor_name 

152 # Cache the executor names, so the logic of this method only runs once 

153 _executor_names.append(executor_name) 

154 

155 return executor_names 

156 

157 @classmethod 

158 def get_executor_names(cls) -> list[ExecutorName]: 

159 """Return the executor names from Airflow configuration. 

160 

161 :return: List of executor names from Airflow configuration 

162 """ 

163 return cls._get_executor_names() 

164 

165 @classmethod 

166 def get_default_executor_name(cls) -> ExecutorName: 

167 """Return the default executor name from Airflow configuration. 

168 

169 :return: executor name from Airflow configuration 

170 """ 

171 # The default executor is the first configured executor in the list 

172 return cls._get_executor_names()[0] 

173 

174 @classmethod 

175 def get_default_executor(cls) -> BaseExecutor: 

176 """Create a new instance of the configured executor if none exists and returns it.""" 

177 default_executor = cls.load_executor(cls.get_default_executor_name()) 

178 

179 return default_executor 

180 

181 @classmethod 

182 def init_executors(cls) -> list[BaseExecutor]: 

183 """Create a new instance of all configured executors if not cached already.""" 

184 executor_names = cls._get_executor_names() 

185 loaded_executors = [] 

186 for executor_name in executor_names: 

187 loaded_executor = cls.load_executor(executor_name.module_path) 

188 if executor_name.alias: 

189 cls.executors[executor_name.alias] = executor_name.module_path 

190 else: 

191 cls.executors[loaded_executor.__class__.__name__] = executor_name.module_path 

192 

193 loaded_executors.append(loaded_executor) 

194 

195 return loaded_executors 

196 

197 @classmethod 

198 def lookup_executor_name_by_str(cls, executor_name_str: str) -> ExecutorName: 

199 # lookup the executor by alias first, if not check if we're given a module path 

200 if executor_name := _alias_to_executors.get(executor_name_str): 

201 return executor_name 

202 elif executor_name := _module_to_executors.get(executor_name_str): 

203 return executor_name 

204 else: 

205 raise AirflowException(f"Unknown executor being loaded: {executor_name_str}") 

206 

207 @classmethod 

208 def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor: 

209 """ 

210 Load the executor. 

211 

212 This supports the following formats: 

213 * by executor name for core executor 

214 * by ``{plugin_name}.{class_name}`` for executor from plugins 

215 * by import path. 

216 * by ExecutorName object specification 

217 

218 :return: an instance of executor class via executor_name 

219 """ 

220 if not executor_name: 

221 _executor_name = cls.get_default_executor_name() 

222 elif isinstance(executor_name, str): 

223 _executor_name = cls.lookup_executor_name_by_str(executor_name) 

224 else: 

225 _executor_name = executor_name 

226 

227 # Check if the executor has been previously loaded. Avoid constructing a new object 

228 if _executor_name in _loaded_executors: 

229 return _loaded_executors[_executor_name] 

230 

231 try: 

232 if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR: 

233 executor = cls.__load_celery_kubernetes_executor() 

234 elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR: 

235 executor = cls.__load_local_kubernetes_executor() 

236 else: 

237 executor_cls, import_source = cls.import_executor_cls(_executor_name) 

238 log.debug("Loading executor %s from %s", _executor_name, import_source.value) 

239 executor = executor_cls() 

240 

241 except ImportError as e: 

242 log.error(e) 

243 raise AirflowConfigException( 

244 f'The module/attribute could not be loaded. Please check "executor" key in "core" section. ' 

245 f'Current value: "{_executor_name}".' 

246 ) 

247 log.info("Loaded executor: %s", _executor_name) 

248 

249 # Store the executor name we've built for this executor in the 

250 # instance. This makes it easier for the Scheduler, Backfill, etc to 

251 # know how we refer to this executor. 

252 executor.name = _executor_name 

253 # Cache this executor by name here, so we can look it up later if it is 

254 # requested again, and not have to construct a new object 

255 _loaded_executors[_executor_name] = executor 

256 

257 return executor 

258 

259 @classmethod 

260 def import_executor_cls( 

261 cls, executor_name: ExecutorName, validate: bool = True 

262 ) -> tuple[type[BaseExecutor], ConnectorSource]: 

263 """ 

264 Import the executor class. 

265 

266 Supports the same formats as ExecutorLoader.load_executor. 

267 

268 :param executor_name: Name of core executor or module path to provider provided as a plugin. 

269 :param validate: Whether or not to validate the executor before returning 

270 

271 :return: executor class via executor_name and executor import source 

272 """ 

273 

274 def _import_and_validate(path: str) -> type[BaseExecutor]: 

275 executor = import_string(path) 

276 if validate: 

277 cls.validate_database_executor_compatibility(executor) 

278 return executor 

279 

280 if executor_name.connector_source == ConnectorSource.PLUGIN: 

281 with suppress(ImportError, AttributeError): 

282 # Load plugins here for executors as at that time the plugins might not have been 

283 # initialized yet 

284 from airflow import plugins_manager 

285 

286 plugins_manager.integrate_executor_plugins() 

287 return ( 

288 _import_and_validate(f"airflow.executors.{executor_name.module_path}"), 

289 ConnectorSource.PLUGIN, 

290 ) 

291 return _import_and_validate(executor_name.module_path), executor_name.connector_source 

292 

293 @classmethod 

294 def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]: 

295 """ 

296 Import the default executor class. 

297 

298 :param validate: Whether or not to validate the executor before returning 

299 

300 :return: executor class and executor import source 

301 """ 

302 executor_name = cls.get_default_executor_name() 

303 executor, source = cls.import_executor_cls(executor_name, validate=validate) 

304 return executor, source 

305 

306 @classmethod 

307 @functools.lru_cache(maxsize=None) 

308 def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None: 

309 """ 

310 Validate database and executor compatibility. 

311 

312 Most of the databases work universally, but SQLite can only work with 

313 single-threaded executors (e.g. Sequential). 

314 

315 This is NOT done in ``airflow.configuration`` (when configuration is 

316 initialized) because loading the executor class is heavy work we want to 

317 avoid unless needed. 

318 """ 

319 # Single threaded executors can run with any backend. 

320 if executor.is_single_threaded: 

321 return 

322 

323 # This is set in tests when we want to be able to use SQLite. 

324 if os.environ.get("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") == "1": 

325 return 

326 

327 if InternalApiConfig.get_use_internal_api(): 

328 return 

329 

330 from airflow.settings import engine 

331 

332 # SQLite only works with single threaded executors 

333 if engine.dialect.name == "sqlite": 

334 raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}") 

335 

336 @classmethod 

337 def __load_celery_kubernetes_executor(cls) -> BaseExecutor: 

338 celery_executor = import_string(cls.executors[CELERY_EXECUTOR])() 

339 kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() 

340 

341 celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR]) 

342 return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor) 

343 

344 @classmethod 

345 def __load_local_kubernetes_executor(cls) -> BaseExecutor: 

346 local_executor = import_string(cls.executors[LOCAL_EXECUTOR])() 

347 kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() 

348 

349 local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR]) 

350 return local_kubernetes_executor_cls(local_executor, kubernetes_executor) 

351 

352 

353# This tuple is deprecated due to AIP-51 and is no longer used in core Airflow. 

354# TODO: Remove in Airflow 3.0 

355UNPICKLEABLE_EXECUTORS = ( 

356 LOCAL_EXECUTOR, 

357 SEQUENTIAL_EXECUTOR, 

358)