Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "fmt"
11 : "slices"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : "github.com/cockroachdb/pebble/objstorage"
17 : )
18 :
19 : // DownloadSpan is a key range passed to the Download method.
20 : type DownloadSpan struct {
21 : StartKey []byte
22 : // EndKey is exclusive.
23 : EndKey []byte
24 : // ViaBackingFileDownload, if true, indicates the span should be downloaded by
25 : // downloading any remote backing files byte-for-byte and replacing them with
26 : // the downloaded local files, while otherwise leaving the virtual SSTables
27 : // as-is. If false, a "normal" rewriting compaction of the span, that iterates
28 : // the keys and produces a new SSTable, is used instead. Downloading raw files
29 : // can be faster when the whole file is being downloaded, as it avoids some
30 : // cpu-intensive steps involved in iteration and new file construction such as
31 : // compression, however it can also be wasteful when only a small portion of a
32 : // larger backing file is being used by a virtual file. Additionally, if the
33 : // virtual file has expensive read-time transformations, such as prefix
34 : // replacement, rewriting once can persist the result of these for future use
35 : // while copying only the backing file will obligate future reads to continue
36 : // to compute such transforms.
37 : ViaBackingFileDownload bool
38 : }
39 :
40 : // Download ensures that the LSM does not use any external sstables for the
41 : // given key ranges. It does so by performing appropriate compactions so that
42 : // all external data becomes available locally.
43 : //
44 : // Note that calling this method does not imply that all other compactions stop;
45 : // it simply informs Pebble of a list of spans for which external data should be
46 : // downloaded with high priority.
47 : //
48 : // The method returns once no external sstables overlap the given spans, the
49 : // context is canceled, the db is closed, or an error is hit.
50 : //
51 : // Note that despite the best effort of this method, if external ingestions
52 : // happen in parallel, a new external file can always appear right as we're
53 : // returning.
54 : //
55 : // TODO(radu): consider passing a priority/impact knob to express how important
56 : // the download is (versus live traffic performance, LSM health).
57 1 : func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
58 1 : ctx, cancel := context.WithCancel(ctx)
59 1 : defer cancel()
60 1 : if err := d.closed.Load(); err != nil {
61 0 : panic(err)
62 : }
63 1 : if d.opts.ReadOnly {
64 0 : return ErrReadOnly
65 0 : }
66 1 : info := DownloadInfo{
67 1 : JobID: int(d.newJobID()),
68 1 : Spans: spans,
69 1 : }
70 1 : startTime := d.timeNow()
71 1 : d.opts.EventListener.DownloadBegin(info)
72 1 :
73 1 : for info.RestartCount = 0; ; info.RestartCount++ {
74 1 : tasks := d.createDownloadTasks(spans)
75 1 : info.Duration = d.timeNow().Sub(startTime)
76 1 : if len(tasks) == 0 {
77 1 : // We are done.
78 1 : info.Done = true
79 1 : d.opts.EventListener.DownloadEnd(info)
80 1 : return nil
81 1 : }
82 1 : if info.RestartCount > 0 {
83 0 : d.opts.EventListener.DownloadBegin(info)
84 0 : }
85 :
86 : // Install the tasks.
87 1 : d.mu.Lock()
88 1 : d.mu.compact.downloads = append(d.mu.compact.downloads, tasks...)
89 1 : d.maybeScheduleCompaction()
90 1 : d.mu.Unlock()
91 1 :
92 1 : err := d.waitForDownloadTasks(ctx, tasks)
93 1 : for _, t := range tasks {
94 1 : info.DownloadCompactionsLaunched += t.numLaunchedDownloads
95 1 : }
96 :
97 1 : if err != nil {
98 0 : info.Err = err
99 0 : info.Duration = d.timeNow().Sub(startTime)
100 0 : d.opts.EventListener.DownloadEnd(info)
101 0 : return err
102 0 : }
103 : }
104 : }
105 :
106 : // createDownloadTasks creates downloadSpanTasks for the download spans that
107 : // overlap external files in the given version.
108 1 : func (d *DB) createDownloadTasks(spans []DownloadSpan) []*downloadSpanTask {
109 1 : d.mu.Lock()
110 1 : vers := d.mu.versions.currentVersion()
111 1 : d.mu.Unlock()
112 1 :
113 1 : tasks := make([]*downloadSpanTask, 0, len(spans))
114 1 : for i := range spans {
115 1 : task, ok := d.newDownloadSpanTask(vers, spans[i])
116 1 : // If !ok, there are no external files in this span.
117 1 : if ok {
118 1 : tasks = append(tasks, task)
119 1 : }
120 : }
121 1 : return tasks
122 : }
123 :
124 : // waitForDownloadTasks waits until all download tasks complete.
125 1 : func (d *DB) waitForDownloadTasks(ctx context.Context, tasks []*downloadSpanTask) error {
126 1 : for i := range tasks {
127 1 : select {
128 0 : case <-ctx.Done():
129 0 : d.removeDownloadTasks(tasks)
130 0 : return ctx.Err()
131 :
132 1 : case err := <-tasks[i].taskCompletedChan:
133 1 : if err != nil {
134 0 : d.removeDownloadTasks(tasks)
135 0 : return err
136 0 : }
137 : }
138 : }
139 1 : return nil
140 : }
141 :
142 : // removeDownloadTasks removes all tasks in the given slice from
143 : // d.mu.compact.downloads.
144 0 : func (d *DB) removeDownloadTasks(tasks []*downloadSpanTask) {
145 0 : d.mu.Lock()
146 0 : defer d.mu.Unlock()
147 0 : d.mu.compact.downloads = slices.DeleteFunc(d.mu.compact.downloads, func(t *downloadSpanTask) bool {
148 0 : return slices.Contains(tasks, t)
149 0 : })
150 : }
151 :
152 : // downloadSpanTask tracks the task of downloading external files that overlap
153 : // with a DownloadSpan.
154 : //
155 : // A downloadSpanTask is spawned only if at least one overlapping external file
156 : // is found in the current version.
157 : //
158 : // When a downloadSpanTask completes (i.e. taskCompletedChan is signaled)
159 : // without an error, it is guaranteed that all external files that were
160 : // overlapping the download span at the beginning of the task are downloaded.
161 : //
162 : // == Implementation ==
163 : //
164 : // A download span task moves through the LSM within the given bounds in
165 : // top-down level order (L0, L1, etc.), and in Smallest.UserKey order within
166 : // each level (and breaking ties in L0 according to LargestSeqNum). We introduce
167 : // the concept of a "download cursor" to keep track of where we are in this
168 : // process, in a way that is independent of any one version. A cursor stores the
169 : // level, a user key which is a lower bound for Smallest.UserKey within that
170 : // level, and a sequence number which is a lower bound for the LargestSeqNum for
171 : // files on that level starting at exactly that key.
172 : //
173 : // While a download task is running, tables with external backings can disappear
174 : // due to excises or compactions; tables can move *down* (to a lower LSM level);
175 : // or tables can have their bounds shrink due to excises (and these will appear
176 : // as new tables, even though they have the same backing). The top-down,
177 : // left-to-right-start-key ordering ensures that we don't miss any table
178 : // (instead, we may examine it multiple times).
179 : //
180 : // We use a cursor that advances as our download task makes progress. Each time
181 : // we encounter a file that needs downloading, we create a "bookmark". A
182 : // bookmark conceptually represents a key range within a level and it
183 : // corresponds to the bounds of the file that we discovered. It is represented
184 : // as a cursor position (corresponding to the start) and an end boundary key. We
185 : // need to remember the bookmark because the download compaction can fail (e.g.
186 : // it can get canceled by an excise) and the file might get excised so we need
187 : // to look again at all files within the original key range.
188 : //
189 : // It is also possible that we encounter files that are already part of a
190 : // compaction. These can be move compaction, or can get canceled, so we can't
191 : // just ignore these files; we create bookmarks for such files as well.
192 : //
193 : // We maintain no more than maxConcurrentDownloads bookmarks - the idea being
194 : // that files that are part of compactions are getting downloaded anyway and we
195 : // can effectively count them toward the limit. When we cannot create any more
196 : // bookmarks, we stop advancing the task cursor. Note that it is not this code's
197 : // job to enforce the maximum concurrency, this is simply a reasonable limit - we
198 : // don't want to accumulate arbitrarily many bookmarks, since we check each one
199 : // whenever tryLaunchDownloadCompaction is called (after every compaction
200 : // completing).
201 : //
202 : // This implementation achieves O(maxConcurrentDownloads * N) level iterator
203 : // operations across the entire task, where N is the (average) number of files
204 : // within the bounds.
205 : type downloadSpanTask struct {
206 : downloadSpan DownloadSpan
207 :
208 : // The download task pertains to sstables which *start* (as per
209 : // Smallest.UserKey) within these bounds.
210 : bounds base.UserKeyBounds
211 :
212 : // taskCompletedChan is signaled when we have finished download compactions
213 : // for all external files encountered within the bounds, or when one of these
214 : // compactions reports an error (other than ErrCancelledCompaction).
215 : taskCompletedChan chan error
216 :
217 : numLaunchedDownloads int
218 :
219 : // Keeps track of the current position; all files up to these position were
220 : // examined and were either downloaded or we have bookmarks for them.
221 : cursor downloadCursor
222 :
223 : // Bookmarks remember areas which correspond to downloads that we started or
224 : // files that were undergoing other compactions and which we need to check
225 : // again before completing the task.
226 : bookmarks []downloadBookmark
227 :
228 : // Testing hooks.
229 : testing struct {
230 : launchDownloadCompaction func(f *fileMetadata) (chan error, bool)
231 : }
232 : }
233 :
234 : // downloadBookmark represents an area that was swept by the task cursor which
235 : // corresponds to a file that was part of a running compaction or download.
236 : type downloadBookmark struct {
237 : start downloadCursor
238 : endBound base.UserKeyBoundary
239 : // downloadDoneCh is set if this bookmark corresponds to a download we
240 : // started; in this case the channel will report the status of that
241 : // compaction.
242 : downloadDoneCh chan error
243 : }
244 :
245 1 : func (d *DB) newDownloadSpanTask(vers *version, sp DownloadSpan) (_ *downloadSpanTask, ok bool) {
246 1 : bounds := base.UserKeyBoundsEndExclusive(sp.StartKey, sp.EndKey)
247 1 : // We are interested in all external sstables that *overlap* with
248 1 : // [sp.StartKey, sp.EndKey). Expand the bounds to the left so that we
249 1 : // include the start keys of any external sstables that overlap with
250 1 : // sp.StartKey.
251 1 : vers.IterAllLevelsAndSublevels(func(iter manifest.LevelIterator, level, sublevel int) {
252 1 : if f := iter.SeekGE(d.cmp, sp.StartKey); f != nil &&
253 1 : objstorage.IsExternalTable(d.objProvider, f.FileBacking.DiskFileNum) &&
254 1 : d.cmp(f.Smallest.UserKey, bounds.Start) < 0 {
255 0 : bounds.Start = f.Smallest.UserKey
256 0 : }
257 : })
258 1 : startCursor := downloadCursor{
259 1 : level: 0,
260 1 : key: bounds.Start,
261 1 : seqNum: 0,
262 1 : }
263 1 : f, level := startCursor.NextExternalFile(d.cmp, d.objProvider, bounds, vers)
264 1 : if f == nil {
265 1 : // No external files in the given span.
266 1 : return nil, false
267 1 : }
268 :
269 1 : return &downloadSpanTask{
270 1 : downloadSpan: sp,
271 1 : bounds: bounds,
272 1 : taskCompletedChan: make(chan error, 1),
273 1 : cursor: makeCursorAtFile(f, level),
274 1 : }, true
275 : }
276 :
277 : // downloadCursor represents a position in the download process, which does not
278 : // depend on a specific version.
279 : //
280 : // The Download process scans for external files level-by-level (starting with
281 : // L0), and left-to-right (in terms of Smallest.UserKey) within each level. In
282 : // L0, we break ties by the LargestSeqNum.
283 : //
284 : // A cursor can be thought of as a boundary between two files in a version
285 : // (ordered by level, then by Smallest.UserKey, then by LargestSeqNum). A file
286 : // is either "before" or "after" the cursor.
287 : type downloadCursor struct {
288 : // LSM level (0 to NumLevels). When level=NumLevels, the cursor is at the end.
289 : level int
290 : // Inclusive lower bound for Smallest.UserKey for tables on level.
291 : key []byte
292 : // Inclusive lower bound for sequence number for tables on level with
293 : // Smallest.UserKey equaling key. Used to break ties within L0, and also used
294 : // to position a cursor immediately after a given file.
295 : seqNum uint64
296 : }
297 :
298 : var endCursor = downloadCursor{level: manifest.NumLevels}
299 :
300 : // AtEnd returns true if the cursor is after all relevant files.
301 1 : func (c downloadCursor) AtEnd() bool {
302 1 : return c.level >= manifest.NumLevels
303 1 : }
304 :
305 1 : func (c downloadCursor) String() string {
306 1 : return fmt.Sprintf("level=%d key=%q seqNum=%d", c.level, c.key, c.seqNum)
307 1 : }
308 :
309 : // makeCursorAtFile returns a downloadCursor that is immediately before the
310 : // given file. Calling nextExternalFile on the resulting cursor (using the same
311 : // version) should return f.
312 1 : func makeCursorAtFile(f *fileMetadata, level int) downloadCursor {
313 1 : return downloadCursor{
314 1 : level: level,
315 1 : key: f.Smallest.UserKey,
316 1 : seqNum: f.LargestSeqNum,
317 1 : }
318 1 : }
319 :
320 : // makeCursorAfterFile returns a downloadCursor that is immediately
321 : // after the given file.
322 1 : func makeCursorAfterFile(f *fileMetadata, level int) downloadCursor {
323 1 : return downloadCursor{
324 1 : level: level,
325 1 : key: f.Smallest.UserKey,
326 1 : seqNum: f.LargestSeqNum + 1,
327 1 : }
328 1 : }
329 :
330 1 : func (c downloadCursor) FileIsAfterCursor(cmp base.Compare, f *fileMetadata, level int) bool {
331 1 : return c.Compare(cmp, makeCursorAfterFile(f, level)) < 0
332 1 : }
333 :
334 1 : func (c downloadCursor) Compare(keyCmp base.Compare, other downloadCursor) int {
335 1 : if c := cmp.Compare(c.level, other.level); c != 0 {
336 0 : return c
337 0 : }
338 1 : if c := keyCmp(c.key, other.key); c != 0 {
339 1 : return c
340 1 : }
341 1 : return cmp.Compare(c.seqNum, other.seqNum)
342 : }
343 :
344 : // NextExternalFile returns the first file after the cursor, returning the file
345 : // and the level. If no such file exists, returns nil fileMetadata.
346 : func (c downloadCursor) NextExternalFile(
347 : cmp base.Compare, objProvider objstorage.Provider, bounds base.UserKeyBounds, v *version,
348 1 : ) (_ *fileMetadata, level int) {
349 1 : for !c.AtEnd() {
350 1 : if f := c.NextExternalFileOnLevel(cmp, objProvider, bounds.End, v); f != nil {
351 1 : return f, c.level
352 1 : }
353 : // Go to the next level.
354 1 : c.key = bounds.Start
355 1 : c.seqNum = 0
356 1 : c.level++
357 : }
358 1 : return nil, manifest.NumLevels
359 : }
360 :
361 : // NextExternalFileOnLevel returns the first external file on c.level which is
362 : // after c and with Smallest.UserKey within the end bound.
363 : func (c downloadCursor) NextExternalFileOnLevel(
364 : cmp base.Compare, objProvider objstorage.Provider, endBound base.UserKeyBoundary, v *version,
365 1 : ) *fileMetadata {
366 1 : if c.level > 0 {
367 1 : it := v.Levels[c.level].Iter()
368 1 : return firstExternalFileInLevelIter(cmp, objProvider, c, it, endBound)
369 1 : }
370 : // For L0, we look at all sublevel iterators and take the first file.
371 1 : var first *fileMetadata
372 1 : var firstCursor downloadCursor
373 1 : for _, sublevel := range v.L0SublevelFiles {
374 1 : f := firstExternalFileInLevelIter(cmp, objProvider, c, sublevel.Iter(), endBound)
375 1 : if f != nil {
376 1 : c := makeCursorAtFile(f, c.level)
377 1 : if first == nil || c.Compare(cmp, firstCursor) < 0 {
378 1 : first = f
379 1 : firstCursor = c
380 1 : }
381 : // Trim the end bound as an optimization.
382 1 : endBound = base.UserKeyInclusive(f.Smallest.UserKey)
383 : }
384 : }
385 1 : return first
386 : }
387 :
388 : // firstExternalFileInLevelIter finds the first external file after the cursor
389 : // but which starts before the endBound. It is assumed that the iterator
390 : // corresponds to cursor.level.
391 : func firstExternalFileInLevelIter(
392 : cmp base.Compare,
393 : objProvider objstorage.Provider,
394 : cursor downloadCursor,
395 : it manifest.LevelIterator,
396 : endBound base.UserKeyBoundary,
397 1 : ) *fileMetadata {
398 1 : f := it.SeekGE(cmp, cursor.key)
399 1 : // Skip the file if it starts before cursor.key or is at that same key with lower
400 1 : // sequence number.
401 1 : for f != nil && !cursor.FileIsAfterCursor(cmp, f, cursor.level) {
402 1 : f = it.Next()
403 1 : }
404 1 : for ; f != nil && endBound.IsUpperBoundFor(cmp, f.Smallest.UserKey); f = it.Next() {
405 1 : if f.Virtual && objstorage.IsExternalTable(objProvider, f.FileBacking.DiskFileNum) {
406 1 : return f
407 1 : }
408 : }
409 1 : return nil
410 : }
411 :
412 : // tryLaunchDownloadForFile attempt to launch a download compaction for the
413 : // given file. Returns true on success, or false if the file is already
414 : // involved in a compaction.
415 : func (d *DB) tryLaunchDownloadForFile(
416 : vers *version, env compactionEnv, download *downloadSpanTask, level int, f *fileMetadata,
417 1 : ) (doneCh chan error, ok bool) {
418 1 : if f.IsCompacting() {
419 1 : return nil, false
420 1 : }
421 1 : if download.testing.launchDownloadCompaction != nil {
422 1 : return download.testing.launchDownloadCompaction(f)
423 1 : }
424 1 : kind := compactionKindRewrite
425 1 : if download.downloadSpan.ViaBackingFileDownload {
426 1 : kind = compactionKindCopy
427 1 : }
428 1 : pc := pickDownloadCompaction(vers, d.opts, env, d.mu.versions.picker.getBaseLevel(), kind, level, f)
429 1 : if pc == nil {
430 0 : // We are not able to run this download compaction at this time.
431 0 : return nil, false
432 0 : }
433 :
434 1 : download.numLaunchedDownloads++
435 1 : doneCh = make(chan error, 1)
436 1 : c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider)
437 1 : c.isDownload = true
438 1 : d.mu.compact.downloadingCount++
439 1 : d.addInProgressCompaction(c)
440 1 : go d.compact(c, doneCh)
441 1 : return doneCh, true
442 : }
443 :
444 : type launchDownloadResult int8
445 :
446 : const (
447 : launchedCompaction launchDownloadResult = iota
448 : didNotLaunchCompaction
449 : downloadTaskCompleted
450 : )
451 :
452 : func (d *DB) tryLaunchDownloadCompaction(
453 : download *downloadSpanTask, vers *manifest.Version, env compactionEnv, maxConcurrentDownloads int,
454 1 : ) launchDownloadResult {
455 1 : // First, check the bookmarks.
456 1 : for i := 0; i < len(download.bookmarks); i++ {
457 1 : b := &download.bookmarks[i]
458 1 : if b.downloadDoneCh != nil {
459 1 : // First check if the compaction we launched completed.
460 1 : select {
461 1 : case compactionErr := <-b.downloadDoneCh:
462 1 : if compactionErr != nil && !errors.Is(compactionErr, ErrCancelledCompaction) {
463 0 : download.taskCompletedChan <- compactionErr
464 0 : return downloadTaskCompleted
465 0 : }
466 1 : b.downloadDoneCh = nil
467 :
468 : // Even if the compaction finished without an error, we still want to
469 : // check the rest of the bookmark range for external files.
470 : //
471 : // For example, say that we encounter a file ["a", "f"] and start a
472 : // download (creating a bookmark). Then that file gets excised into new
473 : // files ["a", "b"] and ["e", "f"] and the excise causes the download
474 : // compaction to be cancelled. We will start another download compaction
475 : // for ["a", "c"]; once that is complete, we still need to look at the
476 : // rest of the bookmark range (i.e. up to "f") to discover the
477 : // ["e", "f"] file.
478 :
479 0 : default:
480 0 : // The compaction is still running, go to the next bookmark.
481 0 : continue
482 : }
483 : }
484 :
485 : // If downloadDoneCh was nil, we are waiting on a compaction that we did not
486 : // start. We are effectively polling the status by checking the external
487 : // files within the bookmark. This is ok because this method is called (for
488 : // this download task) at most once every time a compaction completes.
489 :
490 1 : f := b.start.NextExternalFileOnLevel(d.cmp, d.objProvider, b.endBound, vers)
491 1 : if f == nil {
492 1 : // No more external files for this bookmark, remove it.
493 1 : download.bookmarks = slices.Delete(download.bookmarks, i, i+1)
494 1 : i--
495 1 : continue
496 : }
497 :
498 : // Move up the bookmark position to point at this file.
499 1 : b.start = makeCursorAtFile(f, b.start.level)
500 1 : doneCh, ok := d.tryLaunchDownloadForFile(vers, env, download, b.start.level, f)
501 1 : if ok {
502 1 : b.downloadDoneCh = doneCh
503 1 : return launchedCompaction
504 1 : }
505 : // We could not launch a download, which means the file is part of another
506 : // compaction. We leave the bookmark in place and will poll the status in
507 : // the code above.
508 : }
509 :
510 : // Try to advance the cursor and launch more downloads.
511 1 : for len(download.bookmarks) < maxConcurrentDownloads {
512 1 : f, level := download.cursor.NextExternalFile(d.cmp, d.objProvider, download.bounds, vers)
513 1 : if f == nil {
514 1 : download.cursor = endCursor
515 1 : if len(download.bookmarks) == 0 {
516 1 : download.taskCompletedChan <- nil
517 1 : return downloadTaskCompleted
518 1 : }
519 0 : return didNotLaunchCompaction
520 : }
521 1 : download.cursor = makeCursorAfterFile(f, level)
522 1 :
523 1 : download.bookmarks = append(download.bookmarks, downloadBookmark{
524 1 : start: makeCursorAtFile(f, level),
525 1 : endBound: base.UserKeyInclusive(f.Largest.UserKey),
526 1 : })
527 1 : doneCh, ok := d.tryLaunchDownloadForFile(vers, env, download, level, f)
528 1 : if ok {
529 1 : // We launched a download for this file.
530 1 : download.bookmarks[len(download.bookmarks)-1].downloadDoneCh = doneCh
531 1 : return launchedCompaction
532 1 : }
533 : }
534 :
535 1 : return didNotLaunchCompaction
536 : }
|