Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/configuration.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""SDK configuration parser that extends the shared parser.""" 

19 

20from __future__ import annotations 

21 

22import logging 

23import os 

24import pathlib 

25from configparser import ConfigParser 

26from io import StringIO 

27from typing import Any 

28 

29from airflow.sdk import yaml 

30from airflow.sdk._shared.configuration.parser import ( 

31 AirflowConfigParser as _SharedAirflowConfigParser, 

32 configure_parser_from_configuration_description, 

33) 

34from airflow.sdk.execution_time.secrets import _SERVER_DEFAULT_SECRETS_SEARCH_PATH 

35 

36log = logging.getLogger(__name__) 

37 

38 

39def _default_config_file_path(file_name: str) -> str: 

40 """Get path to airflow core config.yml file.""" 

41 # TODO: Task SDK will have its own config.yml 

42 # Temporary: SDK uses Core's config files during development 

43 

44 # Option 1: For installed packages 

45 config_path = pathlib.Path(__file__).parent.parent / "config_templates" / file_name 

46 if config_path.exists(): 

47 return str(config_path) 

48 

49 # Option 2: Monorepo structure 

50 config_path = ( 

51 pathlib.Path(__file__).parent.parent.parent.parent.parent 

52 / "airflow-core" 

53 / "src" 

54 / "airflow" 

55 / "config_templates" 

56 / file_name 

57 ) 

58 if config_path.exists(): 

59 return str(config_path) 

60 

61 raise FileNotFoundError(f"Could not find '{file_name}' in config_templates. ") 

62 

63 

64def retrieve_configuration_description() -> dict[str, dict[str, Any]]: 

65 """ 

66 Read Airflow configuration description from Core's YAML file. 

67 

68 SDK reads airflow core config.yml. Eventually SDK will have its own config.yml 

69 with only authoring related configs. 

70 

71 :return: Python dictionary containing configs & their info 

72 """ 

73 base_configuration_description: dict[str, dict[str, Any]] = {} 

74 with open(_default_config_file_path("config.yml")) as config_file: 

75 base_configuration_description.update(yaml.safe_load(config_file)) 

76 return base_configuration_description 

77 

78 

79def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: 

80 """ 

81 Create default config parser based on configuration description. 

82 

83 This version expands {AIRFLOW_HOME} in default values but not other 

84 Core-specific expansion variables (SECRET_KEY, FERNET_KEY, etc.). 

85 

86 :param configuration_description: configuration description from config.yml 

87 :return: Default Config Parser with default values 

88 """ 

89 parser = ConfigParser() 

90 all_vars = get_sdk_expansion_variables() 

91 configure_parser_from_configuration_description(parser, configuration_description, all_vars) 

92 return parser 

93 

94 

95def get_sdk_expansion_variables() -> dict[str, Any]: 

96 """ 

97 Get variables available for config value expansion in SDK. 

98 

99 SDK only needs AIRFLOW_HOME for expansion. Core specific variables 

100 (FERNET_KEY, JWT_SECRET_KEY, etc.) are not needed in the SDK. 

101 """ 

102 airflow_home = os.environ.get("AIRFLOW_HOME", os.path.expanduser("~/airflow")) 

103 return { 

104 "AIRFLOW_HOME": airflow_home, 

105 } 

106 

107 

108def get_airflow_config() -> str: 

109 """Get path to airflow.cfg file.""" 

110 airflow_home = os.environ.get("AIRFLOW_HOME", os.path.expanduser("~/airflow")) 

111 return os.path.join(airflow_home, "airflow.cfg") 

112 

113 

114class AirflowSDKConfigParser(_SharedAirflowConfigParser): 

115 """ 

116 SDK configuration parser that extends the shared parser. 

117 

118 In Phase 1, this reads Core's config.yml and can optionally read airflow.cfg. 

119 Eventually SDK will have its own config.yml with only authoring-related configs. 

120 """ 

121 

122 def __init__( 

123 self, 

124 default_config: str | None = None, 

125 *args, 

126 **kwargs, 

127 ): 

128 # Read Core's config.yml (Phase 1: shared config.yml) 

129 configuration_description = retrieve_configuration_description() 

130 # Create default values parser 

131 _default_values = create_default_config_parser(configuration_description) 

132 super().__init__(configuration_description, _default_values, *args, **kwargs) 

133 self.configuration_description = configuration_description 

134 self._default_values = _default_values 

135 self._suppress_future_warnings = False 

136 

137 # Optionally load from airflow.cfg if it exists 

138 airflow_config = get_airflow_config() 

139 if os.path.exists(airflow_config): 

140 try: 

141 self.read(airflow_config) 

142 except Exception as e: 

143 log.warning("Could not read airflow.cfg from %s: %s", airflow_config, e) 

144 

145 if default_config is not None: 

146 self._update_defaults_from_string(default_config) 

147 

148 def expand_all_configuration_values(self): 

149 """Expand all configuration values using SDK-specific expansion variables.""" 

150 all_vars = get_sdk_expansion_variables() 

151 for section in self.sections(): 

152 for key, value in self.items(section): 

153 if value is not None: 

154 if self.has_option(section, key): 

155 self.remove_option(section, key) 

156 if self.is_template(section, key) or not isinstance(value, str): 

157 self.set(section, key, value) 

158 else: 

159 try: 

160 self.set(section, key, value.format(**all_vars)) 

161 except (KeyError, ValueError, IndexError): 

162 # Leave unexpanded if variable not available 

163 self.set(section, key, value) 

164 

165 def load_test_config(self): 

166 """ 

167 Use the test configuration instead of Airflow defaults. 

168 

169 Unit tests load values from `unit_tests.cfg` to ensure consistent behavior. Realistically we should 

170 not have this needed but this is temporary to help fix the tests that use dag_maker and rely on few 

171 configurations. 

172 

173 The SDK does not expand template variables (FERNET_KEY, JWT_SECRET_KEY, etc.) because it does not use 

174 the config fields that require expansion. 

175 """ 

176 unit_test_config_file = pathlib.Path(_default_config_file_path("unit_tests.cfg")) 

177 unit_test_config = unit_test_config_file.read_text() 

178 self.remove_all_read_configurations() 

179 with StringIO(unit_test_config) as test_config_file: 

180 self.read_file(test_config_file) 

181 

182 self.expand_all_configuration_values() 

183 

184 log.info("Unit test configuration loaded from 'unit_tests.cfg'") 

185 

186 def remove_all_read_configurations(self): 

187 """Remove all read configurations, leaving only default values in the config.""" 

188 for section in self.sections(): 

189 self.remove_section(section) 

190 

191 

192def get_custom_secret_backend(worker_mode: bool = False): 

193 """ 

194 Get Secret Backend if defined in airflow.cfg. 

195 

196 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

197 

198 This is a convenience function that calls conf._get_custom_secret_backend(). 

199 Uses SDK's conf instead of Core's conf. 

200 """ 

201 # Lazy import to trigger __getattr__ and lazy initialization 

202 from airflow.sdk.configuration import conf 

203 

204 return conf._get_custom_secret_backend(worker_mode=worker_mode) 

205 

206 

207def initialize_secrets_backends( 

208 default_backends: list[str] = _SERVER_DEFAULT_SECRETS_SEARCH_PATH, 

209): 

210 """ 

211 Initialize secrets backend. 

212 

213 * import secrets backend classes 

214 * instantiate them and return them in a list 

215 

216 Uses SDK's conf instead of Core's conf. 

217 """ 

218 from airflow.sdk._shared.module_loading import import_string 

219 

220 backend_list = [] 

221 worker_mode = False 

222 # Determine worker mode - if default_backends is not the server default, it's worker mode 

223 # This is a simplified check; in practice, worker mode is determined by the caller 

224 if default_backends != _SERVER_DEFAULT_SECRETS_SEARCH_PATH: 

225 worker_mode = True 

226 

227 custom_secret_backend = get_custom_secret_backend(worker_mode) 

228 

229 if custom_secret_backend is not None: 

230 backend_list.append(custom_secret_backend) 

231 

232 for class_name in default_backends: 

233 secrets_backend_cls = import_string(class_name) 

234 backend_list.append(secrets_backend_cls()) 

235 

236 return backend_list 

237 

238 

239def ensure_secrets_loaded( 

240 default_backends: list[str] = _SERVER_DEFAULT_SECRETS_SEARCH_PATH, 

241) -> list: 

242 """ 

243 Ensure that all secrets backends are loaded. 

244 

245 If the secrets_backend_list contains only 2 default backends, reload it. 

246 """ 

247 # Check if the secrets_backend_list contains only 2 default backends. 

248 

249 # Check if we are loading the backends for worker too by checking if the default_backends is equal 

250 # to _SERVER_DEFAULT_SECRETS_SEARCH_PATH. 

251 secrets_backend_list = initialize_secrets_backends() 

252 if len(secrets_backend_list) == 2 or default_backends != _SERVER_DEFAULT_SECRETS_SEARCH_PATH: 

253 return initialize_secrets_backends(default_backends=default_backends) 

254 return secrets_backend_list 

255 

256 

257def initialize_config() -> AirflowSDKConfigParser: 

258 """ 

259 Initialize SDK configuration parser. 

260 

261 Called automatically when SDK is imported. 

262 """ 

263 airflow_config_parser = AirflowSDKConfigParser() 

264 if airflow_config_parser.getboolean("core", "unit_test_mode", fallback=False): 

265 airflow_config_parser.load_test_config() 

266 return airflow_config_parser 

267 

268 

269def __getattr__(name: str): 

270 if name == "conf": 

271 val = initialize_config() 

272 globals()[name] = val 

273 return val 

274 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")