Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/dag_processing/bundles/manager.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

182 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import warnings 

20from typing import TYPE_CHECKING 

21 

22from itsdangerous import URLSafeSerializer 

23from pydantic import BaseModel, ValidationError 

24from sqlalchemy import delete, select 

25 

26from airflow._shared.module_loading import import_string 

27from airflow.configuration import conf 

28from airflow.dag_processing.bundles.base import BaseDagBundle # noqa: TC001 

29from airflow.exceptions import AirflowConfigException 

30from airflow.models.dagbundle import DagBundleModel 

31from airflow.models.team import Team 

32from airflow.utils.log.logging_mixin import LoggingMixin 

33from airflow.utils.session import NEW_SESSION, provide_session 

34 

35if TYPE_CHECKING: 

36 from collections.abc import Iterable 

37 

38 from sqlalchemy.orm import Session 

39 

40_example_dag_bundle_name = "example_dags" 

41 

42 

43class _ExternalBundleConfig(BaseModel): 

44 """Schema defining the user-specified configuration for a DAG bundle.""" 

45 

46 name: str 

47 classpath: str 

48 kwargs: dict 

49 team_name: str | None = None 

50 

51 

52class _InternalBundleConfig(BaseModel): 

53 """ 

54 Schema used internally (in this file) to define the configuration for a DAG bundle. 

55 

56 Configuration defined by users when read must match ``_ExternalBundleConfig``. 

57 This configuration is then parsed and converted to ``_InternalBundleConfig`` to be used across this file. 

58 """ 

59 

60 bundle_class: type[BaseDagBundle] 

61 kwargs: dict 

62 team_name: str | None = None 

63 

64 

65def _bundle_item_exc(msg): 

66 return AirflowConfigException( 

67 "Invalid config for section `dag_processor` key `dag_bundle_config_list`. " + msg 

68 ) 

69 

70 

71def _parse_bundle_config(config_list) -> list[_ExternalBundleConfig]: 

72 bundles = {} 

73 for item in config_list: 

74 if not isinstance(item, dict): 

75 raise _bundle_item_exc(f"Expected dict but got {item.__class__}") 

76 

77 try: 

78 cfg = _ExternalBundleConfig(**item) 

79 except ValidationError as e: 

80 raise _bundle_item_exc(f"Item {item} failed validation: {e}") 

81 

82 if cfg.name == _example_dag_bundle_name: 

83 raise AirflowConfigException( 

84 f"Bundle name '{_example_dag_bundle_name}' is a reserved name. Please choose another name for your bundle." 

85 " Example DAGs can be enabled with the '[core] load_examples' config." 

86 ) 

87 

88 bundles[cfg.name] = cfg 

89 if len(bundles.keys()) != len(config_list): 

90 raise _bundle_item_exc("One or more bundle names appeared multiple times") 

91 return list(bundles.values()) 

92 

93 

94def _add_example_dag_bundle(bundle_config_list: list[_ExternalBundleConfig]): 

95 from airflow import example_dags 

96 

97 example_dag_folder = next(iter(example_dags.__path__)) 

98 bundle_config_list.append( 

99 _ExternalBundleConfig( 

100 name=_example_dag_bundle_name, 

101 classpath="airflow.dag_processing.bundles.local.LocalDagBundle", 

102 kwargs={ 

103 "path": example_dag_folder, 

104 }, 

105 ) 

106 ) 

107 

108 

109def _is_safe_bundle_url(url: str) -> bool: 

110 """ 

111 Check if a bundle URL is safe to use. 

112 

113 This function validates that the URL: 

114 - Uses HTTP or HTTPS schemes (no JavaScript, data, or other schemes) 

115 - Is properly formatted 

116 - Doesn't contain malicious content 

117 """ 

118 import logging 

119 from urllib.parse import urlparse 

120 

121 logger = logging.getLogger(__name__) 

122 

123 if not url: 

124 return False 

125 

126 try: 

127 parsed = urlparse(url) 

128 if parsed.scheme not in {"http", "https"}: 

129 logger.error( 

130 "Bundle URL uses unsafe scheme '%s'. Only 'http' and 'https' are allowed", parsed.scheme 

131 ) 

132 return False 

133 

134 if not parsed.netloc: 

135 logger.error("Bundle URL '%s' has no network location", url) 

136 return False 

137 

138 if any(ord(c) < 32 for c in url): 

139 logger.error("Bundle URL '%s' contains control characters (ASCII < 32)", url) 

140 return False 

141 

142 return True 

143 except Exception as e: 

144 logger.error("Failed to parse bundle URL '%s': %s", url, str(e)) 

145 return False 

146 

147 

148def _sign_bundle_url(url: str, bundle_name: str) -> str: 

149 """ 

150 Sign a bundle URL for integrity verification. 

151 

152 :param url: The URL to sign 

153 :param bundle_name: The name of the bundle (used in the payload) 

154 :return: The signed URL token 

155 """ 

156 serializer = URLSafeSerializer(conf.get_mandatory_value("core", "fernet_key")) 

157 payload = { 

158 "url": url, 

159 "bundle_name": bundle_name, 

160 } 

161 return serializer.dumps(payload) 

162 

163 

164class DagBundlesManager(LoggingMixin): 

165 """Manager for DAG bundles.""" 

166 

167 def __init__(self, *args, **kwargs): 

168 super().__init__(*args, **kwargs) 

169 self._bundle_config: dict[str, _InternalBundleConfig] = {} 

170 self.parse_config() 

171 

172 def parse_config(self) -> None: 

173 """ 

174 Get all DAG bundle configurations and store in instance variable. 

175 

176 If a bundle class for a given name has already been imported, it will not be imported again. 

177 

178 :meta private: 

179 """ 

180 if self._bundle_config: 

181 return 

182 

183 config_list = conf.getjson("dag_processor", "dag_bundle_config_list") 

184 if not config_list: 

185 return 

186 if not isinstance(config_list, list): 

187 raise AirflowConfigException( 

188 "Section `dag_processor` key `dag_bundle_config_list` " 

189 f"must be list but got {config_list.__class__}" 

190 ) 

191 bundle_config_list = _parse_bundle_config(config_list) 

192 if conf.getboolean("core", "LOAD_EXAMPLES"): 

193 _add_example_dag_bundle(bundle_config_list) 

194 

195 for bundle_config in bundle_config_list: 

196 if bundle_config.team_name and not conf.getboolean("core", "multi_team"): 

197 raise AirflowConfigException( 

198 "Section `dag_processor` key `dag_bundle_config_list` " 

199 "cannot have a team name when multi-team mode is disabled." 

200 "To enable multi-team, you need to update section `core` key `multi_team` in your config." 

201 ) 

202 

203 class_ = import_string(bundle_config.classpath) 

204 self._bundle_config[bundle_config.name] = _InternalBundleConfig( 

205 bundle_class=class_, 

206 kwargs=bundle_config.kwargs, 

207 team_name=bundle_config.team_name, 

208 ) 

209 self.log.info("DAG bundles loaded: %s", ", ".join(self._bundle_config.keys())) 

210 

211 @provide_session 

212 def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: 

213 self.log.debug("Syncing DAG bundles to the database") 

214 

215 def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]: 

216 bundle_instance = self.get_bundle(name) 

217 new_template_ = bundle_instance.view_url_template() 

218 new_params_ = self._extract_template_params(bundle_instance) 

219 if new_template_: 

220 if not _is_safe_bundle_url(new_template_): 

221 self.log.warning( 

222 "Bundle %s has unsafe URL template '%s', skipping URL update", 

223 bundle_name, 

224 new_template_, 

225 ) 

226 new_template_ = None 

227 else: 

228 # Sign the URL for integrity verification 

229 new_template_ = _sign_bundle_url(new_template_, bundle_name) 

230 self.log.debug("Signed URL template for bundle %s", bundle_name) 

231 return new_template_, new_params_ 

232 

233 stored = {b.name: b for b in session.query(DagBundleModel).all()} 

234 bundle_to_team = { 

235 bundle.name: bundle.teams[0].name if len(bundle.teams) == 1 else None 

236 for bundle in stored.values() 

237 } 

238 

239 for name, config in self._bundle_config.items(): 

240 team: Team | None = None 

241 if config.team_name: 

242 team = session.scalars(select(Team).where(Team.name == config.team_name)).one_or_none() 

243 if not team: 

244 raise _bundle_item_exc(f"Team '{config.team_name}' does not exist") 

245 

246 try: 

247 new_template, new_params = _extract_and_sign_template(name) 

248 except Exception as e: 

249 self.log.exception("Error creating bundle '%s': %s", name, e) 

250 continue 

251 

252 if bundle := stored.pop(name, None): 

253 bundle.active = True 

254 if new_template != bundle.signed_url_template: 

255 bundle.signed_url_template = new_template 

256 self.log.debug("Updated URL template for bundle %s", name) 

257 if new_params != bundle.template_params: 

258 bundle.template_params = new_params 

259 self.log.debug("Updated template parameters for bundle %s", name) 

260 else: 

261 bundle = DagBundleModel(name=name) 

262 bundle.signed_url_template = new_template 

263 bundle.template_params = new_params 

264 

265 session.add(bundle) 

266 self.log.info("Added new DAG bundle %s to the database", name) 

267 

268 if team and bundle_to_team.get(name) != config.team_name: 

269 # Change of team. It can be associating a team to a dag bundle that did not have one or 

270 # swapping a team for another 

271 bundle.teams = [team] 

272 if bundle_to_team.get(name): 

273 self.log.warning( 

274 "Changing ownership of team '%s' from Dag bundle '%s' to '%s'", 

275 bundle_to_team[name], 

276 name, 

277 team.name, 

278 ) 

279 elif not team and name in bundle_to_team: 

280 # Remove team association 

281 self.log.warning( 

282 "Removing ownership of team '%s' from Dag bundle '%s'", bundle_to_team[name], name 

283 ) 

284 bundle.teams = [] 

285 

286 # Import here to avoid circular import 

287 from airflow.models.errors import ParseImportError 

288 

289 for name, bundle in stored.items(): 

290 bundle.active = False 

291 bundle.teams = [] 

292 self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name) 

293 session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name)) 

294 self.log.info("Deleted import errors for bundle %s which is no longer configured", name) 

295 

296 @staticmethod 

297 def _extract_template_params(bundle_instance: BaseDagBundle) -> dict: 

298 """ 

299 Extract template parameters from a bundle instance's view_url_template method. 

300 

301 :param bundle_instance: The bundle instance to extract parameters from 

302 :return: Dictionary of template parameters 

303 """ 

304 import re 

305 

306 params: dict[str, str] = {} 

307 template = bundle_instance.view_url_template() 

308 

309 if not template: 

310 return params 

311 

312 # Extract template placeholders using regex 

313 # This matches {placeholder} patterns in the template 

314 PLACEHOLDER_PATTERN = re.compile(r"\{([^}]+)\}") 

315 placeholders = PLACEHOLDER_PATTERN.findall(template) 

316 

317 # Extract values for each placeholder found in the template 

318 for placeholder in placeholders: 

319 field_value = getattr(bundle_instance, placeholder, None) 

320 if field_value: 

321 params[placeholder] = field_value 

322 

323 return params 

324 

325 def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: 

326 """ 

327 Get a DAG bundle by name. 

328 

329 :param name: The name of the DAG bundle. 

330 :param version: The version of the DAG bundle you need (optional). If not provided, ``tracking_ref`` will be used instead. 

331 

332 :return: The DAG bundle. 

333 """ 

334 cfg_bundle = self._bundle_config.get(name) 

335 if not cfg_bundle: 

336 raise ValueError(f"Requested bundle '{name}' is not configured.") 

337 return cfg_bundle.bundle_class(name=name, version=version, **cfg_bundle.kwargs) 

338 

339 def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]: 

340 """ 

341 Get all DAG bundles. 

342 

343 :return: list of DAG bundles. 

344 """ 

345 for name, cfg in self._bundle_config.items(): 

346 try: 

347 yield cfg.bundle_class(name=name, version=None, **cfg.kwargs) 

348 except Exception as e: 

349 self.log.exception("Error creating bundle '%s': %s", name, e) 

350 # Skip this bundle and continue with others 

351 continue 

352 

353 def get_all_bundle_names(self) -> Iterable[str]: 

354 """ 

355 Get all bundle names. 

356 

357 :return: sorted list of bundle names. 

358 """ 

359 return sorted(self._bundle_config.keys()) 

360 

361 def view_url(self, name: str, version: str | None = None) -> str | None: 

362 warnings.warn( 

363 "The 'view_url' method is deprecated and will be removed when providers " 

364 "have Airflow 3.1 as the minimum supported version. " 

365 "Use DagBundleModel.render_url() instead.", 

366 DeprecationWarning, 

367 stacklevel=2, 

368 ) 

369 bundle = self.get_bundle(name, version) 

370 return bundle.view_url(version=version)