Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/executors/executor_loader.py: 44%

88 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""All executors.""" 

18from __future__ import annotations 

19 

20import functools 

21import logging 

22import os 

23from contextlib import suppress 

24from enum import Enum, unique 

25from typing import TYPE_CHECKING 

26 

27from airflow.exceptions import AirflowConfigException 

28from airflow.executors.executor_constants import ( 

29 CELERY_EXECUTOR, 

30 CELERY_KUBERNETES_EXECUTOR, 

31 DASK_EXECUTOR, 

32 DEBUG_EXECUTOR, 

33 KUBERNETES_EXECUTOR, 

34 LOCAL_EXECUTOR, 

35 LOCAL_KUBERNETES_EXECUTOR, 

36 SEQUENTIAL_EXECUTOR, 

37) 

38from airflow.utils.module_loading import import_string 

39 

40log = logging.getLogger(__name__) 

41 

42if TYPE_CHECKING: 

43 from airflow.executors.base_executor import BaseExecutor 

44 

45 

46@unique 

47class ConnectorSource(Enum): 

48 """Enum of supported executor import sources.""" 

49 

50 CORE = "core" 

51 PLUGIN = "plugin" 

52 CUSTOM_PATH = "custom path" 

53 

54 

55class ExecutorLoader: 

56 """Keeps constants for all the currently available executors.""" 

57 

58 _default_executor: BaseExecutor | None = None 

59 executors = { 

60 LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor", 

61 LOCAL_KUBERNETES_EXECUTOR: "airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor", 

62 SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor", 

63 CELERY_EXECUTOR: "airflow.executors.celery_executor.CeleryExecutor", 

64 CELERY_KUBERNETES_EXECUTOR: "airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor", 

65 DASK_EXECUTOR: "airflow.executors.dask_executor.DaskExecutor", 

66 KUBERNETES_EXECUTOR: "airflow.executors.kubernetes_executor.KubernetesExecutor", 

67 DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor", 

68 } 

69 

70 @classmethod 

71 def get_default_executor_name(cls) -> str: 

72 """Returns the default executor name from Airflow configuration. 

73 

74 :return: executor name from Airflow configuration 

75 """ 

76 from airflow.configuration import conf 

77 

78 return conf.get_mandatory_value("core", "EXECUTOR") 

79 

80 @classmethod 

81 def get_default_executor(cls) -> BaseExecutor: 

82 """Creates a new instance of the configured executor if none exists and returns it.""" 

83 if cls._default_executor is not None: 

84 return cls._default_executor 

85 

86 return cls.load_executor(cls.get_default_executor_name()) 

87 

88 @classmethod 

89 def load_executor(cls, executor_name: str) -> BaseExecutor: 

90 """ 

91 Loads the executor. 

92 

93 This supports the following formats: 

94 * by executor name for core executor 

95 * by ``{plugin_name}.{class_name}`` for executor from plugins 

96 * by import path. 

97 

98 :return: an instance of executor class via executor_name 

99 """ 

100 if executor_name == CELERY_KUBERNETES_EXECUTOR: 

101 return cls.__load_celery_kubernetes_executor() 

102 elif executor_name == LOCAL_KUBERNETES_EXECUTOR: 

103 return cls.__load_local_kubernetes_executor() 

104 

105 try: 

106 executor_cls, import_source = cls.import_executor_cls(executor_name) 

107 log.debug("Loading executor %s from %s", executor_name, import_source.value) 

108 except ImportError as e: 

109 log.error(e) 

110 raise AirflowConfigException( 

111 f'The module/attribute could not be loaded. Please check "executor" key in "core" section. ' 

112 f'Current value: "{executor_name}".' 

113 ) 

114 log.info("Loaded executor: %s", executor_name) 

115 

116 return executor_cls() 

117 

118 @classmethod 

119 def import_executor_cls(cls, executor_name: str) -> tuple[type[BaseExecutor], ConnectorSource]: 

120 """ 

121 Imports the executor class. 

122 

123 Supports the same formats as ExecutorLoader.load_executor. 

124 

125 :return: executor class via executor_name and executor import source 

126 """ 

127 

128 def _import_and_validate(path: str) -> type[BaseExecutor]: 

129 executor = import_string(path) 

130 cls.validate_database_executor_compatibility(executor) 

131 return executor 

132 

133 if executor_name in cls.executors: 

134 return _import_and_validate(cls.executors[executor_name]), ConnectorSource.CORE 

135 if executor_name.count(".") == 1: 

136 log.debug( 

137 "The executor name looks like the plugin path (executor_name=%s). Trying to import a " 

138 "executor from a plugin", 

139 executor_name, 

140 ) 

141 with suppress(ImportError, AttributeError): 

142 # Load plugins here for executors as at that time the plugins might not have been 

143 # initialized yet 

144 from airflow import plugins_manager 

145 

146 plugins_manager.integrate_executor_plugins() 

147 return _import_and_validate(f"airflow.executors.{executor_name}"), ConnectorSource.PLUGIN 

148 return _import_and_validate(executor_name), ConnectorSource.CUSTOM_PATH 

149 

150 @classmethod 

151 def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]: 

152 """ 

153 Imports the default executor class. 

154 

155 :return: executor class and executor import source 

156 """ 

157 executor_name = cls.get_default_executor_name() 

158 executor, source = cls.import_executor_cls(executor_name) 

159 return executor, source 

160 

161 @classmethod 

162 @functools.lru_cache(maxsize=None) 

163 def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None: 

164 """Validate database and executor compatibility. 

165 

166 Most of the databases work universally, but SQLite can only work with 

167 single-threaded executors (e.g. Sequential). 

168 

169 This is NOT done in ``airflow.configuration`` (when configuration is 

170 initialized) because loading the executor class is heavy work we want to 

171 avoid unless needed. 

172 """ 

173 # Single threaded executors can run with any backend. 

174 if executor.is_single_threaded: 

175 return 

176 

177 # This is set in tests when we want to be able to use SQLite. 

178 if os.environ.get("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") == "1": 

179 return 

180 

181 from airflow.settings import engine 

182 

183 # SQLite only works with single threaded executors 

184 if engine.dialect.name == "sqlite": 

185 raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}") 

186 

187 @classmethod 

188 def __load_celery_kubernetes_executor(cls) -> BaseExecutor: 

189 celery_executor = import_string(cls.executors[CELERY_EXECUTOR])() 

190 kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() 

191 

192 celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR]) 

193 return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor) 

194 

195 @classmethod 

196 def __load_local_kubernetes_executor(cls) -> BaseExecutor: 

197 local_executor = import_string(cls.executors[LOCAL_EXECUTOR])() 

198 kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() 

199 

200 local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR]) 

201 return local_kubernetes_executor_cls(local_executor, kubernetes_executor) 

202 

203 

204# This tuple is deprecated due to AIP-51 and is no longer used in core Airflow. 

205# TODO: Remove in Airflow 3.0 

206UNPICKLEABLE_EXECUTORS = ( 

207 LOCAL_EXECUTOR, 

208 SEQUENTIAL_EXECUTOR, 

209 DASK_EXECUTOR, 

210)