1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20import fcntl
21import logging
22import os
23import shutil
24import tempfile
25import warnings
26from abc import ABC, abstractmethod
27from contextlib import contextmanager
28from dataclasses import dataclass, field
29from datetime import timedelta
30from fcntl import LOCK_SH, LOCK_UN, flock
31from operator import attrgetter
32from pathlib import Path
33from typing import TYPE_CHECKING
34
35import pendulum
36from pendulum.parsing import ParserError
37
38from airflow.configuration import conf
39
40if TYPE_CHECKING:
41 from pendulum import DateTime
42
43 from airflow.typing_compat import Self
44
45log = logging.getLogger(__name__)
46
47
48def get_bundle_storage_root_path():
49 if configured_location := conf.get("dag_processor", "dag_bundle_storage_path", fallback=None):
50 return Path(configured_location)
51 return Path(tempfile.gettempdir(), "airflow", "dag_bundles")
52
53
54STALE_BUNDLE_TRACKING_FOLDER = get_bundle_storage_root_path() / "_tracking"
55
56
57def get_bundle_tracking_dir(bundle_name: str) -> Path:
58 return STALE_BUNDLE_TRACKING_FOLDER / bundle_name
59
60
61def get_bundle_tracking_file(bundle_name: str, version: str) -> Path:
62 tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name)
63 return Path(tracking_dir, version)
64
65
66def get_bundle_base_folder(bundle_name: str) -> Path:
67 return get_bundle_storage_root_path() / bundle_name
68
69
70def get_bundle_versions_base_folder(bundle_name: str) -> Path:
71 return get_bundle_base_folder(bundle_name=bundle_name) / "versions"
72
73
74def get_bundle_version_path(bundle_name: str, version: str) -> Path:
75 base_folder = get_bundle_versions_base_folder(bundle_name=bundle_name)
76 return base_folder / version
77
78
79@dataclass(frozen=True)
80class TrackedBundleVersionInfo:
81 """
82 Internal info class for stale bundle cleanup.
83
84 :meta private:
85 """
86
87 lock_file_path: Path
88 version: str = field(compare=False)
89 dt: DateTime = field(compare=False)
90
91
92class BundleUsageTrackingManager:
93 """
94 Utility helper for removing stale bundles.
95
96 :meta private:
97 """
98
99 def _parse_dt(self, val) -> DateTime | None:
100 try:
101 dt = pendulum.parse(val)
102 return dt if isinstance(dt, pendulum.DateTime) else None
103 except ParserError:
104 return None
105
106 @staticmethod
107 def _filter_for_min_versions(val: list[TrackedBundleVersionInfo]) -> list[TrackedBundleVersionInfo]:
108 min_versions_to_keep = conf.getint(
109 section="dag_processor",
110 key="stale_bundle_cleanup_min_versions",
111 )
112 return sorted(val, key=attrgetter("dt"), reverse=True)[min_versions_to_keep:]
113
114 @staticmethod
115 def _filter_for_recency(val: list[TrackedBundleVersionInfo]) -> list[TrackedBundleVersionInfo]:
116 age_threshold = conf.getint(
117 section="dag_processor",
118 key="stale_bundle_cleanup_age_threshold",
119 )
120 ret = []
121 now = pendulum.now(tz=pendulum.UTC)
122 cutoff = now - timedelta(seconds=age_threshold)
123 for item in val:
124 if item.dt < cutoff:
125 ret.append(item)
126 return ret
127
128 def _find_all_tracking_files(self, bundle_name) -> list[TrackedBundleVersionInfo] | None:
129 tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name)
130 found: list[TrackedBundleVersionInfo] = []
131 if not tracking_dir.exists():
132 log.debug("bundle usage tracking directory does not exist. tracking_dir=%s", tracking_dir)
133 return None
134 for file in tracking_dir.iterdir():
135 log.debug("found bundle tracking file, path=%s", file)
136 version = file.name
137 dt_str = file.read_text()
138 dt = self._parse_dt(val=dt_str)
139 if not dt:
140 log.error(
141 "could not parse val as datetime bundle_name=%s val=%s version=%s",
142 bundle_name,
143 dt_str,
144 version,
145 )
146 continue
147 found.append(TrackedBundleVersionInfo(lock_file_path=file, version=version, dt=dt))
148 return found
149
150 @staticmethod
151 def _remove_stale_bundle(bundle_name: str, info: TrackedBundleVersionInfo) -> None:
152 bundle_version_path = get_bundle_version_path(
153 bundle_name=bundle_name,
154 version=info.version,
155 )
156
157 def log_info(msg):
158 log.info(
159 "%s bundle_name=%s bundle_version=%s bundle_path=%s lock_file=%s",
160 msg,
161 bundle_name,
162 info.version,
163 bundle_version_path,
164 info.lock_file_path,
165 )
166
167 try:
168 log_info("removing stale bundle.")
169 with open(info.lock_file_path, "a") as f:
170 flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) # exclusive lock, do not wait
171 # remove the actual bundle copy
172 shutil.rmtree(bundle_version_path)
173 # remove the lock file
174 os.remove(info.lock_file_path)
175 except BlockingIOError:
176 log_info("could not obtain lock. stale bundle will not be removed.")
177 return
178
179 def _find_candidates(self, found):
180 """Remove the recently used bundles."""
181 candidates = self._filter_for_min_versions(found)
182 candidates = self._filter_for_recency(candidates)
183 if log.isEnabledFor(level=logging.DEBUG):
184 self._debug_candidates(candidates, found)
185 return candidates
186
187 @staticmethod
188 def _debug_candidates(candidates, found):
189 recently_used = list(set(found).difference(candidates))
190 if candidates:
191 log.debug(
192 "found removal candidates. candidates=%s recently_used=%s",
193 candidates,
194 recently_used,
195 )
196 else:
197 log.debug(
198 "no removal candidates found. candidates=%s recently_used=%s",
199 candidates,
200 recently_used,
201 )
202
203 def _remove_stale_bundle_versions_for_bundle(self, bundle_name: str):
204 log.info("checking bundle for stale versions. bundle_name=%s", bundle_name)
205 found = self._find_all_tracking_files(bundle_name=bundle_name)
206 if not found:
207 return
208 candidates = self._find_candidates(found)
209 for info in candidates:
210 self._remove_stale_bundle(bundle_name=bundle_name, info=info)
211
212 def remove_stale_bundle_versions(self):
213 """
214 Remove bundles that are not in use and have not been used for some time.
215
216 We will keep last N used bundles, and bundles last used with in X time.
217
218 This isn't really necessary on worker types that don't share storage
219 with other processes.
220 """
221 from airflow.dag_processing.bundles.manager import DagBundlesManager
222
223 log.info("checking for stale bundle versions locally")
224
225 bundles = list(DagBundlesManager().get_all_dag_bundles())
226 for bundle in bundles:
227 if not bundle.supports_versioning:
228 continue
229 self._remove_stale_bundle_versions_for_bundle(bundle_name=bundle.name)
230
231
232class BaseDagBundle(ABC):
233 """
234 Base class for DAG bundles.
235
236 DAG bundles are used both by the DAG processor and by a worker when running a task. These usage
237 patterns are different, however.
238
239 When running a task, we know what version of the bundle we need (assuming the bundle supports versioning).
240 And we likely only need to keep this specific bundle version around for as long as we have tasks running using
241 that bundle version. This also means, that on a single worker, it's possible that multiple versions of the same
242 bundle are used at the same time.
243
244 In contrast, the DAG processor uses a bundle to keep the DAGs from that bundle up to date. There will not be
245 multiple versions of the same bundle in use at the same time. The DAG processor will always use the latest version.
246
247 :param name: String identifier for the DAG bundle
248 :param refresh_interval: How often the bundle should be refreshed from the source in seconds
249 (Optional - defaults to [dag_processor] refresh_interval)
250 :param version: Version of the DAG bundle (Optional)
251 """
252
253 supports_versioning: bool = False
254
255 _locked: bool = False
256
257 def __init__(
258 self,
259 *,
260 name: str,
261 refresh_interval: int = conf.getint("dag_processor", "refresh_interval"),
262 version: str | None = None,
263 view_url_template: str | None = None,
264 ) -> None:
265 self.name = name
266 self.version = version
267 self.refresh_interval = refresh_interval
268 self.is_initialized: bool = False
269
270 self.base_dir = get_bundle_base_folder(bundle_name=self.name)
271 """Base directory for all bundle files for this bundle."""
272
273 self.versions_dir = get_bundle_versions_base_folder(bundle_name=self.name)
274 """Where bundle versions are stored locally for this bundle."""
275
276 self._view_url_template = view_url_template
277
278 def initialize(self) -> None:
279 """
280 Initialize the bundle.
281
282 This method is called by the DAG processor and worker before the bundle is used,
283 and allows for deferring expensive operations until that point in time. This will
284 only be called when Airflow needs the bundle files on disk - some uses only need
285 to call the `view_url` method, which can run without initializing the bundle.
286
287 This method must ultimately be safe to call concurrently from different threads or processes.
288 If it isn't naturally safe, you'll need to make it so with some form of locking.
289 There is a `lock` context manager on this class available for this purpose.
290
291 If you override this method, ensure you call `super().initialize()`
292 at the end of your method, after the bundle is initialized, not the beginning.
293 """
294 self.is_initialized = True
295
296 # Check if the bundle path exists after initialization
297 bundle_path = self.path
298 if not bundle_path.exists():
299 log.warning(
300 "Bundle '%s' path does not exist: %s. This may cause DAG loading issues.",
301 self.name,
302 bundle_path,
303 )
304
305 @property
306 @abstractmethod
307 def path(self) -> Path:
308 """
309 Path for this bundle.
310
311 Airflow will use this path to find/load/execute the DAGs from the bundle.
312 After `initialize` has been called, all dag files in the bundle should be accessible from this path.
313 """
314
315 @abstractmethod
316 def get_current_version(self) -> str | None:
317 """
318 Retrieve a string that represents the version of the DAG bundle.
319
320 Airflow can use this value to retrieve this same bundle version later.
321 """
322
323 @abstractmethod
324 def refresh(self) -> None:
325 """
326 Retrieve the latest version of the files in the bundle.
327
328 This method must ultimately be safe to call concurrently from different threads or processes.
329 If it isn't naturally safe, you'll need to make it so with some form of locking.
330 There is a `lock` context manager on this class available for this purpose.
331 """
332
333 def view_url(self, version: str | None = None) -> str | None:
334 """
335 URL to view the bundle on an external website. This is shown to users in the Airflow UI, allowing them to navigate to this url for more details about that version of the bundle.
336
337 This needs to function without `initialize` being called.
338 :param version: Version to view
339 :return: URL to view the bundle
340 """
341 warnings.warn(
342 "The 'view_url' method is deprecated and will be removed in a future version. "
343 "Use 'view_url_template' instead.",
344 DeprecationWarning,
345 stacklevel=2,
346 )
347 return None
348
349 def view_url_template(self) -> str | None:
350 """
351 URL template to view the bundle on an external website.
352
353 This is shown to users in the Airflow UI, allowing them to navigate to
354 this url for more details about that version of the bundle.
355
356 The template should use format string placeholders like {version}, {subdir}, etc.
357 Common placeholders:
358 - {version}: The version identifier
359 - {subdir}: The subdirectory within the bundle (if applicable)
360
361 This needs to function without `initialize` being called.
362
363 :return: URL template string or None if not applicable
364 """
365 return self._view_url_template
366
367 @contextmanager
368 def lock(self):
369 """
370 Ensure only a single bundle can enter this context at a time, by taking an exclusive lock on a lockfile.
371
372 This is useful when a bundle needs to perform operations that are not safe to run concurrently.
373 """
374 if self._locked:
375 yield
376 return
377
378 lock_dir_path = get_bundle_storage_root_path() / "_locks"
379 lock_dir_path.mkdir(parents=True, exist_ok=True)
380 lock_file_path = lock_dir_path / f"{self.name}.lock"
381
382 with open(lock_file_path, "w") as lock_file:
383 # Exclusive lock - blocks until it is available
384 fcntl.flock(lock_file, fcntl.LOCK_EX)
385 try:
386 self._locked = True
387 yield
388 finally:
389 fcntl.flock(lock_file, LOCK_UN)
390 self._locked = False
391
392 def __repr__(self):
393 return f"{self.__class__.__name__}(name={self.name})"
394
395
396class BundleVersionLock:
397 """
398 Lock version of bundle when in use to prevent deletion.
399
400 :meta private:
401 """
402
403 def __init__(self, bundle_name, bundle_version, **kwargs):
404 super().__init__(**kwargs)
405 self.lock_file = None
406 self.bundle_name = bundle_name
407 self.version = bundle_version
408 self.lock_file_path: Path | None = None
409 if self.version:
410 self.lock_file_path = get_bundle_tracking_file(
411 bundle_name=self.bundle_name,
412 version=self.version,
413 )
414
415 def _log_exc(self, msg):
416 log.exception(
417 "% name=%s version=%s lock_file=%s",
418 msg,
419 self.bundle_name,
420 self.version,
421 self.lock_file_path,
422 )
423
424 def _update_version_file(self):
425 """Create a version file containing last-used timestamp."""
426 if TYPE_CHECKING:
427 assert self.lock_file_path
428 self.lock_file_path.parent.mkdir(parents=True, exist_ok=True)
429
430 with tempfile.TemporaryDirectory() as td:
431 temp_file = Path(td, self.lock_file_path)
432 now = pendulum.now(tz=pendulum.UTC)
433 temp_file.write_text(now.isoformat())
434 os.replace(temp_file, self.lock_file_path)
435
436 def acquire(self):
437 if not self.version:
438 return
439 if self.lock_file:
440 return
441 self._update_version_file()
442 if TYPE_CHECKING:
443 assert self.lock_file_path
444 self.lock_file = open(self.lock_file_path)
445 flock(self.lock_file, LOCK_SH)
446
447 def release(self):
448 if self.lock_file:
449 flock(self.lock_file, LOCK_UN)
450 self.lock_file.close()
451 self.lock_file = None
452
453 def __enter__(self) -> Self:
454 # wrapping in try except here is just extra cautious since this is in task execution path
455 try:
456 self.acquire()
457 except Exception:
458 self._log_exc("error when attempting to acquire lock")
459 return self
460
461 def __exit__(self, exc_type, exc_val, exc_tb):
462 # wrapping in try except here is just extra cautious since this is in task execution path
463 try:
464 self.release()
465 except Exception:
466 self._log_exc("error when attempting to release lock")