1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""SDK configuration parser that extends the shared parser."""
19
20from __future__ import annotations
21
22import logging
23import os
24import pathlib
25from configparser import ConfigParser
26from io import StringIO
27from typing import Any
28
29from airflow.sdk import yaml
30from airflow.sdk._shared.configuration.parser import AirflowConfigParser as _SharedAirflowConfigParser
31from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
32
33log = logging.getLogger(__name__)
34
35
36def _default_config_file_path(file_name: str) -> str:
37 """Get path to airflow core config.yml file."""
38 # TODO: Task SDK will have its own config.yml
39 # Temporary: SDK uses Core's config files during development
40
41 # Option 1: For installed packages
42 config_path = pathlib.Path(__file__).parent.parent / "config_templates" / file_name
43 if config_path.exists():
44 return str(config_path)
45
46 # Option 2: Monorepo structure
47 config_path = (
48 pathlib.Path(__file__).parent.parent.parent.parent.parent
49 / "airflow-core"
50 / "src"
51 / "airflow"
52 / "config_templates"
53 / file_name
54 )
55 if config_path.exists():
56 return str(config_path)
57
58 raise FileNotFoundError(f"Could not find '{file_name}' in config_templates. ")
59
60
61def retrieve_configuration_description() -> dict[str, dict[str, Any]]:
62 """
63 Read Airflow configuration description from Core's YAML file.
64
65 SDK reads airflow core config.yml. Eventually SDK will have its own config.yml
66 with only authoring related configs.
67
68 :return: Python dictionary containing configs & their info
69 """
70 base_configuration_description: dict[str, dict[str, Any]] = {}
71 with open(_default_config_file_path("config.yml")) as config_file:
72 base_configuration_description.update(yaml.safe_load(config_file))
73 return base_configuration_description
74
75
76def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser:
77 """
78 Create default config parser based on configuration description.
79
80 This is a simplified version that doesn't expand variables (SDK doesn't need
81 Core-specific expansion variables like SECRET_KEY, FERNET_KEY, etc.).
82
83 :param configuration_description: configuration description from config.yml
84 :return: Default Config Parser with default values
85 """
86 parser = ConfigParser()
87 for section, section_desc in configuration_description.items():
88 parser.add_section(section)
89 options = section_desc["options"]
90 for key in options:
91 default_value = options[key]["default"]
92 is_template = options[key].get("is_template", False)
93 if default_value is not None:
94 if is_template or not isinstance(default_value, str):
95 parser.set(section, key, str(default_value))
96 else:
97 parser.set(section, key, default_value)
98 return parser
99
100
101def get_airflow_config() -> str:
102 """Get path to airflow.cfg file."""
103 airflow_home = os.environ.get("AIRFLOW_HOME", os.path.expanduser("~/airflow"))
104 return os.path.join(airflow_home, "airflow.cfg")
105
106
107class AirflowSDKConfigParser(_SharedAirflowConfigParser):
108 """
109 SDK configuration parser that extends the shared parser.
110
111 In Phase 1, this reads Core's config.yml and can optionally read airflow.cfg.
112 Eventually SDK will have its own config.yml with only authoring-related configs.
113 """
114
115 def __init__(
116 self,
117 default_config: str | None = None,
118 *args,
119 **kwargs,
120 ):
121 # Read Core's config.yml (Phase 1: shared config.yml)
122 configuration_description = retrieve_configuration_description()
123 # Create default values parser
124 _default_values = create_default_config_parser(configuration_description)
125 super().__init__(configuration_description, _default_values, *args, **kwargs)
126 self.configuration_description = configuration_description
127 self._default_values = _default_values
128 self._suppress_future_warnings = False
129
130 # Optionally load from airflow.cfg if it exists
131 airflow_config = get_airflow_config()
132 if os.path.exists(airflow_config):
133 try:
134 self.read(airflow_config)
135 except Exception as e:
136 log.warning("Could not read airflow.cfg from %s: %s", airflow_config, e)
137
138 if default_config is not None:
139 self._update_defaults_from_string(default_config)
140
141 def load_test_config(self):
142 """
143 Use the test configuration instead of Airflow defaults.
144
145 Unit tests load values from `unit_tests.cfg` to ensure consistent behavior. Realistically we should
146 not have this needed but this is temporary to help fix the tests that use dag_maker and rely on few
147 configurations.
148
149 The SDK does not expand template variables (FERNET_KEY, JWT_SECRET_KEY, etc.) because it does not use
150 the config fields that require expansion.
151 """
152 unit_test_config_file = (
153 pathlib.Path(__file__).parent.parent.parent.parent.parent
154 / "airflow-core"
155 / "src"
156 / "airflow"
157 / "config_templates"
158 / "unit_tests.cfg"
159 )
160 unit_test_config = unit_test_config_file.read_text()
161 self.remove_all_read_configurations()
162 with StringIO(unit_test_config) as test_config_file:
163 self.read_file(test_config_file)
164 log.info("Unit test configuration loaded from 'unit_tests.cfg'")
165
166 def remove_all_read_configurations(self):
167 """Remove all read configurations, leaving only default values in the config."""
168 for section in self.sections():
169 self.remove_section(section)
170
171
172def get_custom_secret_backend(worker_mode: bool = False):
173 """
174 Get Secret Backend if defined in airflow.cfg.
175
176 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
177
178 This is a convenience function that calls conf._get_custom_secret_backend().
179 Uses SDK's conf instead of Core's conf.
180 """
181 # Lazy import to trigger __getattr__ and lazy initialization
182 from airflow.sdk.configuration import conf
183
184 return conf._get_custom_secret_backend(worker_mode=worker_mode)
185
186
187def initialize_secrets_backends(
188 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
189):
190 """
191 Initialize secrets backend.
192
193 * import secrets backend classes
194 * instantiate them and return them in a list
195
196 Uses SDK's conf instead of Core's conf.
197 """
198 from airflow.sdk._shared.module_loading import import_string
199
200 backend_list = []
201 worker_mode = False
202 # Determine worker mode - if default_backends is not the server default, it's worker mode
203 # This is a simplified check; in practice, worker mode is determined by the caller
204 if default_backends != [
205 "airflow.secrets.environment_variables.EnvironmentVariablesBackend",
206 "airflow.secrets.metastore.MetastoreBackend",
207 ]:
208 worker_mode = True
209
210 custom_secret_backend = get_custom_secret_backend(worker_mode)
211
212 if custom_secret_backend is not None:
213 backend_list.append(custom_secret_backend)
214
215 for class_name in default_backends:
216 secrets_backend_cls = import_string(class_name)
217 backend_list.append(secrets_backend_cls())
218
219 return backend_list
220
221
222def ensure_secrets_loaded(
223 default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
224) -> list:
225 """
226 Ensure that all secrets backends are loaded.
227
228 If the secrets_backend_list contains only 2 default backends, reload it.
229 """
230 # Check if the secrets_backend_list contains only 2 default backends.
231
232 # Check if we are loading the backends for worker too by checking if the default_backends is equal
233 # to DEFAULT_SECRETS_SEARCH_PATH.
234 secrets_backend_list = initialize_secrets_backends()
235 if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH:
236 return initialize_secrets_backends(default_backends=default_backends)
237 return secrets_backend_list
238
239
240def initialize_config() -> AirflowSDKConfigParser:
241 """
242 Initialize SDK configuration parser.
243
244 Called automatically when SDK is imported.
245 """
246 airflow_config_parser = AirflowSDKConfigParser()
247 if airflow_config_parser.getboolean("core", "unit_test_mode", fallback=False):
248 airflow_config_parser.load_test_config()
249 return airflow_config_parser
250
251
252def __getattr__(name: str):
253 if name == "conf":
254 val = initialize_config()
255 globals()[name] = val
256 return val
257 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")