1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""SDK configuration parser that extends the shared parser."""
19
20from __future__ import annotations
21
22import logging
23import os
24import pathlib
25from configparser import ConfigParser
26from io import StringIO
27from typing import Any
28
29from airflow.sdk import yaml
30from airflow.sdk._shared.configuration.parser import (
31 AirflowConfigParser as _SharedAirflowConfigParser,
32 configure_parser_from_configuration_description,
33)
34from airflow.sdk.execution_time.secrets import _SERVER_DEFAULT_SECRETS_SEARCH_PATH
35
36log = logging.getLogger(__name__)
37
38
39def _default_config_file_path(file_name: str) -> str:
40 """Get path to airflow core config.yml file."""
41 # TODO: Task SDK will have its own config.yml
42 # Temporary: SDK uses Core's config files during development
43
44 # Option 1: For installed packages
45 config_path = pathlib.Path(__file__).parent.parent / "config_templates" / file_name
46 if config_path.exists():
47 return str(config_path)
48
49 # Option 2: Monorepo structure
50 config_path = (
51 pathlib.Path(__file__).parent.parent.parent.parent.parent
52 / "airflow-core"
53 / "src"
54 / "airflow"
55 / "config_templates"
56 / file_name
57 )
58 if config_path.exists():
59 return str(config_path)
60
61 raise FileNotFoundError(f"Could not find '{file_name}' in config_templates. ")
62
63
64def retrieve_configuration_description() -> dict[str, dict[str, Any]]:
65 """
66 Read Airflow configuration description from Core's YAML file.
67
68 SDK reads airflow core config.yml. Eventually SDK will have its own config.yml
69 with only authoring related configs.
70
71 :return: Python dictionary containing configs & their info
72 """
73 base_configuration_description: dict[str, dict[str, Any]] = {}
74 with open(_default_config_file_path("config.yml")) as config_file:
75 base_configuration_description.update(yaml.safe_load(config_file))
76 return base_configuration_description
77
78
79def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser:
80 """
81 Create default config parser based on configuration description.
82
83 This version expands {AIRFLOW_HOME} in default values but not other
84 Core-specific expansion variables (SECRET_KEY, FERNET_KEY, etc.).
85
86 :param configuration_description: configuration description from config.yml
87 :return: Default Config Parser with default values
88 """
89 parser = ConfigParser()
90 all_vars = get_sdk_expansion_variables()
91 configure_parser_from_configuration_description(parser, configuration_description, all_vars)
92 return parser
93
94
95def get_sdk_expansion_variables() -> dict[str, Any]:
96 """
97 Get variables available for config value expansion in SDK.
98
99 SDK only needs AIRFLOW_HOME for expansion. Core specific variables
100 (FERNET_KEY, JWT_SECRET_KEY, etc.) are not needed in the SDK.
101 """
102 airflow_home = os.environ.get("AIRFLOW_HOME", os.path.expanduser("~/airflow"))
103 return {
104 "AIRFLOW_HOME": airflow_home,
105 }
106
107
108def get_airflow_config() -> str:
109 """Get path to airflow.cfg file."""
110 airflow_home = os.environ.get("AIRFLOW_HOME", os.path.expanduser("~/airflow"))
111 return os.path.join(airflow_home, "airflow.cfg")
112
113
114class AirflowSDKConfigParser(_SharedAirflowConfigParser):
115 """
116 SDK configuration parser that extends the shared parser.
117
118 In Phase 1, this reads Core's config.yml and can optionally read airflow.cfg.
119 Eventually SDK will have its own config.yml with only authoring-related configs.
120 """
121
122 def __init__(
123 self,
124 default_config: str | None = None,
125 *args,
126 **kwargs,
127 ):
128 # Read Core's config.yml (Phase 1: shared config.yml)
129 configuration_description = retrieve_configuration_description()
130 # Create default values parser
131 _default_values = create_default_config_parser(configuration_description)
132 super().__init__(configuration_description, _default_values, *args, **kwargs)
133 self.configuration_description = configuration_description
134 self._default_values = _default_values
135 self._suppress_future_warnings = False
136
137 # Optionally load from airflow.cfg if it exists
138 airflow_config = get_airflow_config()
139 if os.path.exists(airflow_config):
140 try:
141 self.read(airflow_config)
142 except Exception as e:
143 log.warning("Could not read airflow.cfg from %s: %s", airflow_config, e)
144
145 if default_config is not None:
146 self._update_defaults_from_string(default_config)
147
148 def expand_all_configuration_values(self):
149 """Expand all configuration values using SDK-specific expansion variables."""
150 all_vars = get_sdk_expansion_variables()
151 for section in self.sections():
152 for key, value in self.items(section):
153 if value is not None:
154 if self.has_option(section, key):
155 self.remove_option(section, key)
156 if self.is_template(section, key) or not isinstance(value, str):
157 self.set(section, key, value)
158 else:
159 try:
160 self.set(section, key, value.format(**all_vars))
161 except (KeyError, ValueError, IndexError):
162 # Leave unexpanded if variable not available
163 self.set(section, key, value)
164
165 def load_test_config(self):
166 """
167 Use the test configuration instead of Airflow defaults.
168
169 Unit tests load values from `unit_tests.cfg` to ensure consistent behavior. Realistically we should
170 not have this needed but this is temporary to help fix the tests that use dag_maker and rely on few
171 configurations.
172
173 The SDK does not expand template variables (FERNET_KEY, JWT_SECRET_KEY, etc.) because it does not use
174 the config fields that require expansion.
175 """
176 unit_test_config_file = pathlib.Path(_default_config_file_path("unit_tests.cfg"))
177 unit_test_config = unit_test_config_file.read_text()
178 self.remove_all_read_configurations()
179 with StringIO(unit_test_config) as test_config_file:
180 self.read_file(test_config_file)
181
182 self.expand_all_configuration_values()
183
184 log.info("Unit test configuration loaded from 'unit_tests.cfg'")
185
186 def remove_all_read_configurations(self):
187 """Remove all read configurations, leaving only default values in the config."""
188 for section in self.sections():
189 self.remove_section(section)
190
191
192def get_custom_secret_backend(worker_mode: bool = False):
193 """
194 Get Secret Backend if defined in airflow.cfg.
195
196 Conditionally selects the section, key and kwargs key based on whether it is called from worker or not.
197
198 This is a convenience function that calls conf._get_custom_secret_backend().
199 Uses SDK's conf instead of Core's conf.
200 """
201 # Lazy import to trigger __getattr__ and lazy initialization
202 from airflow.sdk.configuration import conf
203
204 return conf._get_custom_secret_backend(worker_mode=worker_mode)
205
206
207def initialize_secrets_backends(
208 default_backends: list[str] = _SERVER_DEFAULT_SECRETS_SEARCH_PATH,
209):
210 """
211 Initialize secrets backend.
212
213 * import secrets backend classes
214 * instantiate them and return them in a list
215
216 Uses SDK's conf instead of Core's conf.
217 """
218 from airflow.sdk._shared.module_loading import import_string
219
220 backend_list = []
221 worker_mode = False
222 # Determine worker mode - if default_backends is not the server default, it's worker mode
223 # This is a simplified check; in practice, worker mode is determined by the caller
224 if default_backends != _SERVER_DEFAULT_SECRETS_SEARCH_PATH:
225 worker_mode = True
226
227 custom_secret_backend = get_custom_secret_backend(worker_mode)
228
229 if custom_secret_backend is not None:
230 backend_list.append(custom_secret_backend)
231
232 for class_name in default_backends:
233 secrets_backend_cls = import_string(class_name)
234 backend_list.append(secrets_backend_cls())
235
236 return backend_list
237
238
239def ensure_secrets_loaded(
240 default_backends: list[str] = _SERVER_DEFAULT_SECRETS_SEARCH_PATH,
241) -> list:
242 """
243 Ensure that all secrets backends are loaded.
244
245 If the secrets_backend_list contains only 2 default backends, reload it.
246 """
247 # Check if the secrets_backend_list contains only 2 default backends.
248
249 # Check if we are loading the backends for worker too by checking if the default_backends is equal
250 # to _SERVER_DEFAULT_SECRETS_SEARCH_PATH.
251 secrets_backend_list = initialize_secrets_backends()
252 if len(secrets_backend_list) == 2 or default_backends != _SERVER_DEFAULT_SECRETS_SEARCH_PATH:
253 return initialize_secrets_backends(default_backends=default_backends)
254 return secrets_backend_list
255
256
257def initialize_config() -> AirflowSDKConfigParser:
258 """
259 Initialize SDK configuration parser.
260
261 Called automatically when SDK is imported.
262 """
263 airflow_config_parser = AirflowSDKConfigParser()
264 if airflow_config_parser.getboolean("core", "unit_test_mode", fallback=False):
265 airflow_config_parser.load_test_config()
266 return airflow_config_parser
267
268
269def __getattr__(name: str):
270 if name == "conf":
271 val = initialize_config()
272 globals()[name] = val
273 return val
274 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")