Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/dag_processing/bundles/base.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

217 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import fcntl 

21import logging 

22import os 

23import shutil 

24import tempfile 

25import warnings 

26from abc import ABC, abstractmethod 

27from contextlib import contextmanager 

28from dataclasses import dataclass, field 

29from datetime import timedelta 

30from fcntl import LOCK_SH, LOCK_UN, flock 

31from operator import attrgetter 

32from pathlib import Path 

33from typing import TYPE_CHECKING 

34 

35import pendulum 

36from pendulum.parsing import ParserError 

37 

38from airflow.configuration import conf 

39 

40if TYPE_CHECKING: 

41 from pendulum import DateTime 

42 

43 from airflow.typing_compat import Self 

44 

45log = logging.getLogger(__name__) 

46 

47 

48def get_bundle_storage_root_path(): 

49 if configured_location := conf.get("dag_processor", "dag_bundle_storage_path", fallback=None): 

50 return Path(configured_location) 

51 return Path(tempfile.gettempdir(), "airflow", "dag_bundles") 

52 

53 

54STALE_BUNDLE_TRACKING_FOLDER = get_bundle_storage_root_path() / "_tracking" 

55 

56 

57def get_bundle_tracking_dir(bundle_name: str) -> Path: 

58 return STALE_BUNDLE_TRACKING_FOLDER / bundle_name 

59 

60 

61def get_bundle_tracking_file(bundle_name: str, version: str) -> Path: 

62 tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name) 

63 return Path(tracking_dir, version) 

64 

65 

66def get_bundle_base_folder(bundle_name: str) -> Path: 

67 return get_bundle_storage_root_path() / bundle_name 

68 

69 

70def get_bundle_versions_base_folder(bundle_name: str) -> Path: 

71 return get_bundle_base_folder(bundle_name=bundle_name) / "versions" 

72 

73 

74def get_bundle_version_path(bundle_name: str, version: str) -> Path: 

75 base_folder = get_bundle_versions_base_folder(bundle_name=bundle_name) 

76 return base_folder / version 

77 

78 

79@dataclass(frozen=True) 

80class TrackedBundleVersionInfo: 

81 """ 

82 Internal info class for stale bundle cleanup. 

83 

84 :meta private: 

85 """ 

86 

87 lock_file_path: Path 

88 version: str = field(compare=False) 

89 dt: DateTime = field(compare=False) 

90 

91 

92class BundleUsageTrackingManager: 

93 """ 

94 Utility helper for removing stale bundles. 

95 

96 :meta private: 

97 """ 

98 

99 def _parse_dt(self, val) -> DateTime | None: 

100 try: 

101 dt = pendulum.parse(val) 

102 return dt if isinstance(dt, pendulum.DateTime) else None 

103 except ParserError: 

104 return None 

105 

106 @staticmethod 

107 def _filter_for_min_versions(val: list[TrackedBundleVersionInfo]) -> list[TrackedBundleVersionInfo]: 

108 min_versions_to_keep = conf.getint( 

109 section="dag_processor", 

110 key="stale_bundle_cleanup_min_versions", 

111 ) 

112 return sorted(val, key=attrgetter("dt"), reverse=True)[min_versions_to_keep:] 

113 

114 @staticmethod 

115 def _filter_for_recency(val: list[TrackedBundleVersionInfo]) -> list[TrackedBundleVersionInfo]: 

116 age_threshold = conf.getint( 

117 section="dag_processor", 

118 key="stale_bundle_cleanup_age_threshold", 

119 ) 

120 ret = [] 

121 now = pendulum.now(tz=pendulum.UTC) 

122 cutoff = now - timedelta(seconds=age_threshold) 

123 for item in val: 

124 if item.dt < cutoff: 

125 ret.append(item) 

126 return ret 

127 

128 def _find_all_tracking_files(self, bundle_name) -> list[TrackedBundleVersionInfo] | None: 

129 tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name) 

130 found: list[TrackedBundleVersionInfo] = [] 

131 if not tracking_dir.exists(): 

132 log.debug("bundle usage tracking directory does not exist. tracking_dir=%s", tracking_dir) 

133 return None 

134 for file in tracking_dir.iterdir(): 

135 log.debug("found bundle tracking file, path=%s", file) 

136 version = file.name 

137 dt_str = file.read_text() 

138 dt = self._parse_dt(val=dt_str) 

139 if not dt: 

140 log.error( 

141 "could not parse val as datetime bundle_name=%s val=%s version=%s", 

142 bundle_name, 

143 dt_str, 

144 version, 

145 ) 

146 continue 

147 found.append(TrackedBundleVersionInfo(lock_file_path=file, version=version, dt=dt)) 

148 return found 

149 

150 @staticmethod 

151 def _remove_stale_bundle(bundle_name: str, info: TrackedBundleVersionInfo) -> None: 

152 bundle_version_path = get_bundle_version_path( 

153 bundle_name=bundle_name, 

154 version=info.version, 

155 ) 

156 

157 def log_info(msg): 

158 log.info( 

159 "%s bundle_name=%s bundle_version=%s bundle_path=%s lock_file=%s", 

160 msg, 

161 bundle_name, 

162 info.version, 

163 bundle_version_path, 

164 info.lock_file_path, 

165 ) 

166 

167 try: 

168 log_info("removing stale bundle.") 

169 with open(info.lock_file_path, "a") as f: 

170 flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) # exclusive lock, do not wait 

171 # remove the actual bundle copy 

172 shutil.rmtree(bundle_version_path) 

173 # remove the lock file 

174 os.remove(info.lock_file_path) 

175 except BlockingIOError: 

176 log_info("could not obtain lock. stale bundle will not be removed.") 

177 return 

178 

179 def _find_candidates(self, found): 

180 """Remove the recently used bundles.""" 

181 candidates = self._filter_for_min_versions(found) 

182 candidates = self._filter_for_recency(candidates) 

183 if log.isEnabledFor(level=logging.DEBUG): 

184 self._debug_candidates(candidates, found) 

185 return candidates 

186 

187 @staticmethod 

188 def _debug_candidates(candidates, found): 

189 recently_used = list(set(found).difference(candidates)) 

190 if candidates: 

191 log.debug( 

192 "found removal candidates. candidates=%s recently_used=%s", 

193 candidates, 

194 recently_used, 

195 ) 

196 else: 

197 log.debug( 

198 "no removal candidates found. candidates=%s recently_used=%s", 

199 candidates, 

200 recently_used, 

201 ) 

202 

203 def _remove_stale_bundle_versions_for_bundle(self, bundle_name: str): 

204 log.info("checking bundle for stale versions. bundle_name=%s", bundle_name) 

205 found = self._find_all_tracking_files(bundle_name=bundle_name) 

206 if not found: 

207 return 

208 candidates = self._find_candidates(found) 

209 for info in candidates: 

210 self._remove_stale_bundle(bundle_name=bundle_name, info=info) 

211 

212 def remove_stale_bundle_versions(self): 

213 """ 

214 Remove bundles that are not in use and have not been used for some time. 

215 

216 We will keep last N used bundles, and bundles last used with in X time. 

217 

218 This isn't really necessary on worker types that don't share storage 

219 with other processes. 

220 """ 

221 from airflow.dag_processing.bundles.manager import DagBundlesManager 

222 

223 log.info("checking for stale bundle versions locally") 

224 

225 bundles = list(DagBundlesManager().get_all_dag_bundles()) 

226 for bundle in bundles: 

227 if not bundle.supports_versioning: 

228 continue 

229 self._remove_stale_bundle_versions_for_bundle(bundle_name=bundle.name) 

230 

231 

232class BaseDagBundle(ABC): 

233 """ 

234 Base class for DAG bundles. 

235 

236 DAG bundles are used both by the DAG processor and by a worker when running a task. These usage 

237 patterns are different, however. 

238 

239 When running a task, we know what version of the bundle we need (assuming the bundle supports versioning). 

240 And we likely only need to keep this specific bundle version around for as long as we have tasks running using 

241 that bundle version. This also means, that on a single worker, it's possible that multiple versions of the same 

242 bundle are used at the same time. 

243 

244 In contrast, the DAG processor uses a bundle to keep the DAGs from that bundle up to date. There will not be 

245 multiple versions of the same bundle in use at the same time. The DAG processor will always use the latest version. 

246 

247 :param name: String identifier for the DAG bundle 

248 :param refresh_interval: How often the bundle should be refreshed from the source in seconds 

249 (Optional - defaults to [dag_processor] refresh_interval) 

250 :param version: Version of the DAG bundle (Optional) 

251 """ 

252 

253 supports_versioning: bool = False 

254 

255 _locked: bool = False 

256 

257 def __init__( 

258 self, 

259 *, 

260 name: str, 

261 refresh_interval: int = conf.getint("dag_processor", "refresh_interval"), 

262 version: str | None = None, 

263 view_url_template: str | None = None, 

264 ) -> None: 

265 self.name = name 

266 self.version = version 

267 self.refresh_interval = refresh_interval 

268 self.is_initialized: bool = False 

269 

270 self.base_dir = get_bundle_base_folder(bundle_name=self.name) 

271 """Base directory for all bundle files for this bundle.""" 

272 

273 self.versions_dir = get_bundle_versions_base_folder(bundle_name=self.name) 

274 """Where bundle versions are stored locally for this bundle.""" 

275 

276 self._view_url_template = view_url_template 

277 

278 def initialize(self) -> None: 

279 """ 

280 Initialize the bundle. 

281 

282 This method is called by the DAG processor and worker before the bundle is used, 

283 and allows for deferring expensive operations until that point in time. This will 

284 only be called when Airflow needs the bundle files on disk - some uses only need 

285 to call the `view_url` method, which can run without initializing the bundle. 

286 

287 This method must ultimately be safe to call concurrently from different threads or processes. 

288 If it isn't naturally safe, you'll need to make it so with some form of locking. 

289 There is a `lock` context manager on this class available for this purpose. 

290 

291 If you override this method, ensure you call `super().initialize()` 

292 at the end of your method, after the bundle is initialized, not the beginning. 

293 """ 

294 self.is_initialized = True 

295 

296 # Check if the bundle path exists after initialization 

297 bundle_path = self.path 

298 if not bundle_path.exists(): 

299 log.warning( 

300 "Bundle '%s' path does not exist: %s. This may cause DAG loading issues.", 

301 self.name, 

302 bundle_path, 

303 ) 

304 

305 @property 

306 @abstractmethod 

307 def path(self) -> Path: 

308 """ 

309 Path for this bundle. 

310 

311 Airflow will use this path to find/load/execute the DAGs from the bundle. 

312 After `initialize` has been called, all dag files in the bundle should be accessible from this path. 

313 """ 

314 

315 @abstractmethod 

316 def get_current_version(self) -> str | None: 

317 """ 

318 Retrieve a string that represents the version of the DAG bundle. 

319 

320 Airflow can use this value to retrieve this same bundle version later. 

321 """ 

322 

323 @abstractmethod 

324 def refresh(self) -> None: 

325 """ 

326 Retrieve the latest version of the files in the bundle. 

327 

328 This method must ultimately be safe to call concurrently from different threads or processes. 

329 If it isn't naturally safe, you'll need to make it so with some form of locking. 

330 There is a `lock` context manager on this class available for this purpose. 

331 """ 

332 

333 def view_url(self, version: str | None = None) -> str | None: 

334 """ 

335 URL to view the bundle on an external website. This is shown to users in the Airflow UI, allowing them to navigate to this url for more details about that version of the bundle. 

336 

337 This needs to function without `initialize` being called. 

338 :param version: Version to view 

339 :return: URL to view the bundle 

340 """ 

341 warnings.warn( 

342 "The 'view_url' method is deprecated and will be removed in a future version. " 

343 "Use 'view_url_template' instead.", 

344 DeprecationWarning, 

345 stacklevel=2, 

346 ) 

347 return None 

348 

349 def view_url_template(self) -> str | None: 

350 """ 

351 URL template to view the bundle on an external website. 

352 

353 This is shown to users in the Airflow UI, allowing them to navigate to 

354 this url for more details about that version of the bundle. 

355 

356 The template should use format string placeholders like {version}, {subdir}, etc. 

357 Common placeholders: 

358 - {version}: The version identifier 

359 - {subdir}: The subdirectory within the bundle (if applicable) 

360 

361 This needs to function without `initialize` being called. 

362 

363 :return: URL template string or None if not applicable 

364 """ 

365 return self._view_url_template 

366 

367 @contextmanager 

368 def lock(self): 

369 """ 

370 Ensure only a single bundle can enter this context at a time, by taking an exclusive lock on a lockfile. 

371 

372 This is useful when a bundle needs to perform operations that are not safe to run concurrently. 

373 """ 

374 if self._locked: 

375 yield 

376 return 

377 

378 lock_dir_path = get_bundle_storage_root_path() / "_locks" 

379 lock_dir_path.mkdir(parents=True, exist_ok=True) 

380 lock_file_path = lock_dir_path / f"{self.name}.lock" 

381 

382 with open(lock_file_path, "w") as lock_file: 

383 # Exclusive lock - blocks until it is available 

384 fcntl.flock(lock_file, fcntl.LOCK_EX) 

385 try: 

386 self._locked = True 

387 yield 

388 finally: 

389 fcntl.flock(lock_file, LOCK_UN) 

390 self._locked = False 

391 

392 def __repr__(self): 

393 return f"{self.__class__.__name__}(name={self.name})" 

394 

395 

396class BundleVersionLock: 

397 """ 

398 Lock version of bundle when in use to prevent deletion. 

399 

400 :meta private: 

401 """ 

402 

403 def __init__(self, bundle_name, bundle_version, **kwargs): 

404 super().__init__(**kwargs) 

405 self.lock_file = None 

406 self.bundle_name = bundle_name 

407 self.version = bundle_version 

408 self.lock_file_path: Path | None = None 

409 if self.version: 

410 self.lock_file_path = get_bundle_tracking_file( 

411 bundle_name=self.bundle_name, 

412 version=self.version, 

413 ) 

414 

415 def _log_exc(self, msg): 

416 log.exception( 

417 "% name=%s version=%s lock_file=%s", 

418 msg, 

419 self.bundle_name, 

420 self.version, 

421 self.lock_file_path, 

422 ) 

423 

424 def _update_version_file(self): 

425 """Create a version file containing last-used timestamp.""" 

426 if TYPE_CHECKING: 

427 assert self.lock_file_path 

428 self.lock_file_path.parent.mkdir(parents=True, exist_ok=True) 

429 

430 with tempfile.TemporaryDirectory() as td: 

431 temp_file = Path(td, self.lock_file_path) 

432 now = pendulum.now(tz=pendulum.UTC) 

433 temp_file.write_text(now.isoformat()) 

434 os.replace(temp_file, self.lock_file_path) 

435 

436 def acquire(self): 

437 if not self.version: 

438 return 

439 if self.lock_file: 

440 return 

441 self._update_version_file() 

442 if TYPE_CHECKING: 

443 assert self.lock_file_path 

444 self.lock_file = open(self.lock_file_path) 

445 flock(self.lock_file, LOCK_SH) 

446 

447 def release(self): 

448 if self.lock_file: 

449 flock(self.lock_file, LOCK_UN) 

450 self.lock_file.close() 

451 self.lock_file = None 

452 

453 def __enter__(self) -> Self: 

454 # wrapping in try except here is just extra cautious since this is in task execution path 

455 try: 

456 self.acquire() 

457 except Exception: 

458 self._log_exc("error when attempting to acquire lock") 

459 return self 

460 

461 def __exit__(self, exc_type, exc_val, exc_tb): 

462 # wrapping in try except here is just extra cautious since this is in task execution path 

463 try: 

464 self.release() 

465 except Exception: 

466 self._log_exc("error when attempting to release lock")