Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/configuration.py: 63%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""SDK configuration parser that extends the shared parser.""" 

19 

20from __future__ import annotations 

21 

22import logging 

23import os 

24import pathlib 

25from configparser import ConfigParser 

26from io import StringIO 

27from typing import Any 

28 

29from airflow.sdk import yaml 

30from airflow.sdk._shared.configuration.parser import AirflowConfigParser as _SharedAirflowConfigParser 

31from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH 

32 

33log = logging.getLogger(__name__) 

34 

35 

36def _default_config_file_path(file_name: str) -> str: 

37 """Get path to airflow core config.yml file.""" 

38 # TODO: Task SDK will have its own config.yml 

39 # Temporary: SDK uses Core's config files during development 

40 

41 # Option 1: For installed packages 

42 config_path = pathlib.Path(__file__).parent.parent / "config_templates" / file_name 

43 if config_path.exists(): 

44 return str(config_path) 

45 

46 # Option 2: Monorepo structure 

47 config_path = ( 

48 pathlib.Path(__file__).parent.parent.parent.parent.parent 

49 / "airflow-core" 

50 / "src" 

51 / "airflow" 

52 / "config_templates" 

53 / file_name 

54 ) 

55 if config_path.exists(): 

56 return str(config_path) 

57 

58 raise FileNotFoundError(f"Could not find '{file_name}' in config_templates. ") 

59 

60 

61def retrieve_configuration_description() -> dict[str, dict[str, Any]]: 

62 """ 

63 Read Airflow configuration description from Core's YAML file. 

64 

65 SDK reads airflow core config.yml. Eventually SDK will have its own config.yml 

66 with only authoring related configs. 

67 

68 :return: Python dictionary containing configs & their info 

69 """ 

70 base_configuration_description: dict[str, dict[str, Any]] = {} 

71 with open(_default_config_file_path("config.yml")) as config_file: 

72 base_configuration_description.update(yaml.safe_load(config_file)) 

73 return base_configuration_description 

74 

75 

76def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: 

77 """ 

78 Create default config parser based on configuration description. 

79 

80 This is a simplified version that doesn't expand variables (SDK doesn't need 

81 Core-specific expansion variables like SECRET_KEY, FERNET_KEY, etc.). 

82 

83 :param configuration_description: configuration description from config.yml 

84 :return: Default Config Parser with default values 

85 """ 

86 parser = ConfigParser() 

87 for section, section_desc in configuration_description.items(): 

88 parser.add_section(section) 

89 options = section_desc["options"] 

90 for key in options: 

91 default_value = options[key]["default"] 

92 is_template = options[key].get("is_template", False) 

93 if default_value is not None: 

94 if is_template or not isinstance(default_value, str): 

95 parser.set(section, key, str(default_value)) 

96 else: 

97 parser.set(section, key, default_value) 

98 return parser 

99 

100 

101def get_airflow_config() -> str: 

102 """Get path to airflow.cfg file.""" 

103 airflow_home = os.environ.get("AIRFLOW_HOME", os.path.expanduser("~/airflow")) 

104 return os.path.join(airflow_home, "airflow.cfg") 

105 

106 

107class AirflowSDKConfigParser(_SharedAirflowConfigParser): 

108 """ 

109 SDK configuration parser that extends the shared parser. 

110 

111 In Phase 1, this reads Core's config.yml and can optionally read airflow.cfg. 

112 Eventually SDK will have its own config.yml with only authoring-related configs. 

113 """ 

114 

115 def __init__( 

116 self, 

117 default_config: str | None = None, 

118 *args, 

119 **kwargs, 

120 ): 

121 # Read Core's config.yml (Phase 1: shared config.yml) 

122 configuration_description = retrieve_configuration_description() 

123 # Create default values parser 

124 _default_values = create_default_config_parser(configuration_description) 

125 super().__init__(configuration_description, _default_values, *args, **kwargs) 

126 self.configuration_description = configuration_description 

127 self._default_values = _default_values 

128 self._suppress_future_warnings = False 

129 

130 # Optionally load from airflow.cfg if it exists 

131 airflow_config = get_airflow_config() 

132 if os.path.exists(airflow_config): 

133 try: 

134 self.read(airflow_config) 

135 except Exception as e: 

136 log.warning("Could not read airflow.cfg from %s: %s", airflow_config, e) 

137 

138 if default_config is not None: 

139 self._update_defaults_from_string(default_config) 

140 

141 def load_test_config(self): 

142 """ 

143 Use the test configuration instead of Airflow defaults. 

144 

145 Unit tests load values from `unit_tests.cfg` to ensure consistent behavior. Realistically we should 

146 not have this needed but this is temporary to help fix the tests that use dag_maker and rely on few 

147 configurations. 

148 

149 The SDK does not expand template variables (FERNET_KEY, JWT_SECRET_KEY, etc.) because it does not use 

150 the config fields that require expansion. 

151 """ 

152 unit_test_config_file = ( 

153 pathlib.Path(__file__).parent.parent.parent.parent.parent 

154 / "airflow-core" 

155 / "src" 

156 / "airflow" 

157 / "config_templates" 

158 / "unit_tests.cfg" 

159 ) 

160 unit_test_config = unit_test_config_file.read_text() 

161 self.remove_all_read_configurations() 

162 with StringIO(unit_test_config) as test_config_file: 

163 self.read_file(test_config_file) 

164 log.info("Unit test configuration loaded from 'unit_tests.cfg'") 

165 

166 def remove_all_read_configurations(self): 

167 """Remove all read configurations, leaving only default values in the config.""" 

168 for section in self.sections(): 

169 self.remove_section(section) 

170 

171 

172def get_custom_secret_backend(worker_mode: bool = False): 

173 """ 

174 Get Secret Backend if defined in airflow.cfg. 

175 

176 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not. 

177 

178 This is a convenience function that calls conf._get_custom_secret_backend(). 

179 Uses SDK's conf instead of Core's conf. 

180 """ 

181 # Lazy import to trigger __getattr__ and lazy initialization 

182 from airflow.sdk.configuration import conf 

183 

184 return conf._get_custom_secret_backend(worker_mode=worker_mode) 

185 

186 

187def initialize_secrets_backends( 

188 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

189): 

190 """ 

191 Initialize secrets backend. 

192 

193 * import secrets backend classes 

194 * instantiate them and return them in a list 

195 

196 Uses SDK's conf instead of Core's conf. 

197 """ 

198 from airflow.sdk._shared.module_loading import import_string 

199 

200 backend_list = [] 

201 worker_mode = False 

202 # Determine worker mode - if default_backends is not the server default, it's worker mode 

203 # This is a simplified check; in practice, worker mode is determined by the caller 

204 if default_backends != [ 

205 "airflow.secrets.environment_variables.EnvironmentVariablesBackend", 

206 "airflow.secrets.metastore.MetastoreBackend", 

207 ]: 

208 worker_mode = True 

209 

210 custom_secret_backend = get_custom_secret_backend(worker_mode) 

211 

212 if custom_secret_backend is not None: 

213 backend_list.append(custom_secret_backend) 

214 

215 for class_name in default_backends: 

216 secrets_backend_cls = import_string(class_name) 

217 backend_list.append(secrets_backend_cls()) 

218 

219 return backend_list 

220 

221 

222def ensure_secrets_loaded( 

223 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, 

224) -> list: 

225 """ 

226 Ensure that all secrets backends are loaded. 

227 

228 If the secrets_backend_list contains only 2 default backends, reload it. 

229 """ 

230 # Check if the secrets_backend_list contains only 2 default backends. 

231 

232 # Check if we are loading the backends for worker too by checking if the default_backends is equal 

233 # to DEFAULT_SECRETS_SEARCH_PATH. 

234 secrets_backend_list = initialize_secrets_backends() 

235 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH: 

236 return initialize_secrets_backends(default_backends=default_backends) 

237 return secrets_backend_list 

238 

239 

240def initialize_config() -> AirflowSDKConfigParser: 

241 """ 

242 Initialize SDK configuration parser. 

243 

244 Called automatically when SDK is imported. 

245 """ 

246 airflow_config_parser = AirflowSDKConfigParser() 

247 if airflow_config_parser.getboolean("core", "unit_test_mode", fallback=False): 

248 airflow_config_parser.load_test_config() 

249 return airflow_config_parser 

250 

251 

252def __getattr__(name: str): 

253 if name == "conf": 

254 val = initialize_config() 

255 globals()[name] = val 

256 return val 

257 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")