Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "fmt"
11 : "slices"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : "github.com/cockroachdb/pebble/objstorage"
17 : )
18 :
19 : // DownloadSpan is a key range passed to the Download method.
20 : type DownloadSpan struct {
21 : StartKey []byte
22 : // EndKey is exclusive.
23 : EndKey []byte
24 : // ViaBackingFileDownload, if true, indicates the span should be downloaded by
25 : // downloading any remote backing files byte-for-byte and replacing them with
26 : // the downloaded local files, while otherwise leaving the virtual SSTables
27 : // as-is. If false, a "normal" rewriting compaction of the span, that iterates
28 : // the keys and produces a new SSTable, is used instead. Downloading raw files
29 : // can be faster when the whole file is being downloaded, as it avoids some
30 : // cpu-intensive steps involved in iteration and new file construction such as
31 : // compression, however it can also be wasteful when only a small portion of a
32 : // larger backing file is being used by a virtual file. Additionally, if the
33 : // virtual file has expensive read-time transformations, such as prefix
34 : // replacement, rewriting once can persist the result of these for future use
35 : // while copying only the backing file will obligate future reads to continue
36 : // to compute such transforms.
37 : ViaBackingFileDownload bool
38 : }
39 :
40 : // Download ensures that the LSM does not use any external sstables for the
41 : // given key ranges. It does so by performing appropriate compactions so that
42 : // all external data becomes available locally.
43 : //
44 : // Note that calling this method does not imply that all other compactions stop;
45 : // it simply informs Pebble of a list of spans for which external data should be
46 : // downloaded with high priority.
47 : //
48 : // The method returns once no external sstables overlap the given spans, the
49 : // context is canceled, the db is closed, or an error is hit.
50 : //
51 : // Note that despite the best effort of this method, if external ingestions
52 : // happen in parallel, a new external file can always appear right as we're
53 : // returning.
54 : //
55 : // TODO(radu): consider passing a priority/impact knob to express how important
56 : // the download is (versus live traffic performance, LSM health).
57 1 : func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
58 1 : ctx, cancel := context.WithCancel(ctx)
59 1 : defer cancel()
60 1 : if err := d.closed.Load(); err != nil {
61 0 : panic(err)
62 : }
63 1 : if d.opts.ReadOnly {
64 0 : return ErrReadOnly
65 0 : }
66 1 : info := DownloadInfo{
67 1 : JobID: int(d.newJobID()),
68 1 : Spans: spans,
69 1 : }
70 1 : startTime := d.timeNow()
71 1 : d.opts.EventListener.DownloadBegin(info)
72 1 :
73 1 : for info.RestartCount = 0; ; info.RestartCount++ {
74 1 : tasks := d.createDownloadTasks(spans)
75 1 : info.Duration = d.timeNow().Sub(startTime)
76 1 : if len(tasks) == 0 {
77 1 : // We are done.
78 1 : info.Done = true
79 1 : d.opts.EventListener.DownloadEnd(info)
80 1 : return nil
81 1 : }
82 1 : if info.RestartCount > 0 {
83 0 : d.opts.EventListener.DownloadBegin(info)
84 0 : }
85 :
86 : // Install the tasks.
87 1 : d.mu.Lock()
88 1 : d.mu.compact.downloads = append(d.mu.compact.downloads, tasks...)
89 1 : d.maybeScheduleCompaction()
90 1 : d.mu.Unlock()
91 1 :
92 1 : err := d.waitForDownloadTasks(ctx, tasks)
93 1 : for _, t := range tasks {
94 1 : info.DownloadCompactionsLaunched += t.numLaunchedDownloads
95 1 : }
96 :
97 1 : if err != nil {
98 0 : info.Err = err
99 0 : info.Duration = d.timeNow().Sub(startTime)
100 0 : d.opts.EventListener.DownloadEnd(info)
101 0 : return err
102 0 : }
103 : }
104 : }
105 :
106 : // createDownloadTasks creates downloadSpanTasks for the download spans that
107 : // overlap external files in the given version.
108 1 : func (d *DB) createDownloadTasks(spans []DownloadSpan) []*downloadSpanTask {
109 1 : d.mu.Lock()
110 1 : vers := d.mu.versions.currentVersion()
111 1 : d.mu.Unlock()
112 1 :
113 1 : tasks := make([]*downloadSpanTask, 0, len(spans))
114 1 : for i := range spans {
115 1 : task, ok := d.newDownloadSpanTask(vers, spans[i])
116 1 : // If !ok, there are no external files in this span.
117 1 : if ok {
118 1 : tasks = append(tasks, task)
119 1 : }
120 : }
121 1 : return tasks
122 : }
123 :
124 : // waitForDownloadTasks waits until all download tasks complete.
125 1 : func (d *DB) waitForDownloadTasks(ctx context.Context, tasks []*downloadSpanTask) error {
126 1 : for i := range tasks {
127 1 : select {
128 0 : case <-ctx.Done():
129 0 : d.removeDownloadTasks(tasks)
130 0 : return ctx.Err()
131 :
132 1 : case err := <-tasks[i].taskCompletedChan:
133 1 : if err != nil {
134 0 : d.removeDownloadTasks(tasks)
135 0 : return err
136 0 : }
137 : }
138 : }
139 1 : return nil
140 : }
141 :
142 : // removeDownloadTasks removes all tasks in the given slice from
143 : // d.mu.compact.downloads.
144 0 : func (d *DB) removeDownloadTasks(tasks []*downloadSpanTask) {
145 0 : d.mu.Lock()
146 0 : defer d.mu.Unlock()
147 0 : d.mu.compact.downloads = slices.DeleteFunc(d.mu.compact.downloads, func(t *downloadSpanTask) bool {
148 0 : return slices.Contains(tasks, t)
149 0 : })
150 : }
151 :
152 : // downloadSpanTask tracks the task of downloading external files that overlap
153 : // with a DownloadSpan.
154 : //
155 : // A downloadSpanTask is spawned only if at least one overlapping external file
156 : // is found in the current version.
157 : //
158 : // When a downloadSpanTask completes (i.e. taskCompletedChan is signaled)
159 : // without an error, it is guaranteed that all external files that were
160 : // overlapping the download span at the beginning of the task are downloaded.
161 : //
162 : // == Implementation ==
163 : //
164 : // A download span task moves through the LSM within the given bounds in
165 : // top-down level order (L0, L1, etc.), and in Smallest.UserKey order within
166 : // each level (and breaking ties in L0 according to LargestSeqNum). We introduce
167 : // the concept of a "download cursor" to keep track of where we are in this
168 : // process, in a way that is independent of any one version. A cursor stores the
169 : // level, a user key which is a lower bound for Smallest.UserKey within that
170 : // level, and a sequence number which is a lower bound for the LargestSeqNum for
171 : // files on that level starting at exactly that key.
172 : //
173 : // While a download task is running, tables with external backings can disappear
174 : // due to excises or compactions; tables can move *down* (to a lower LSM level);
175 : // or tables can have their bounds shrink due to excises (and these will appear
176 : // as new tables, even though they have the same backing). The top-down,
177 : // left-to-right-start-key ordering ensures that we don't miss any table
178 : // (instead, we may examine it multiple times).
179 : //
180 : // We use a cursor that advances as our download task makes progress. Each time
181 : // we encounter a file that needs downloading, we create a "bookmark". A
182 : // bookmark conceptually represents a key range within a level and it
183 : // corresponds to the bounds of the file that we discovered. It is represented
184 : // as a cursor position (corresponding to the start) and an end boundary key. We
185 : // need to remember the bookmark because the download compaction can fail (e.g.
186 : // it can get canceled by an excise) and the file might get excised so we need
187 : // to look again at all files within the original key range.
188 : //
189 : // It is also possible that we encounter files that are already part of a
190 : // compaction. These can be move compaction, or can get canceled, so we can't
191 : // just ignore these files; we create bookmarks for such files as well.
192 : //
193 : // We maintain no more than maxConcurrentDownloads bookmarks - the idea being
194 : // that files that are part of compactions are getting downloaded anyway and we
195 : // can effectively count them toward the limit. When we cannot create any more
196 : // bookmarks, we stop advancing the task cursor. Note that it is not this code's
197 : // job to enforce the maximum concurrency, this is simply a reasonable limit - we
198 : // don't want to accumulate arbitrarily many bookmarks, since we check each one
199 : // whenever tryLaunchDownloadCompaction is called (after every compaction
200 : // completing).
201 : //
202 : // This implementation achieves O(maxConcurrentDownloads * N) level iterator
203 : // operations across the entire task, where N is the (average) number of files
204 : // within the bounds.
205 : type downloadSpanTask struct {
206 : downloadSpan DownloadSpan
207 :
208 : // The download task pertains to sstables which *start* (as per
209 : // Smallest.UserKey) within these bounds.
210 : bounds base.UserKeyBounds
211 :
212 : // taskCompletedChan is signaled when we have finished download compactions
213 : // for all external files encountered within the bounds, or when one of these
214 : // compactions reports an error (other than ErrCancelledCompaction).
215 : taskCompletedChan chan error
216 :
217 : numLaunchedDownloads int
218 :
219 : // Keeps track of the current position; all files up to these position were
220 : // examined and were either downloaded or we have bookmarks for them.
221 : cursor downloadCursor
222 :
223 : // Bookmarks remember areas which correspond to downloads that we started or
224 : // files that were undergoing other compactions and which we need to check
225 : // again before completing the task.
226 : bookmarks []downloadBookmark
227 :
228 : // Testing hooks.
229 : testing struct {
230 : launchDownloadCompaction func(f *manifest.TableMetadata) (chan error, bool)
231 : }
232 : }
233 :
234 : // downloadBookmark represents an area that was swept by the task cursor which
235 : // corresponds to a file that was part of a running compaction or download.
236 : type downloadBookmark struct {
237 : start downloadCursor
238 : endBound base.UserKeyBoundary
239 : // downloadDoneCh is set if this bookmark corresponds to a download we
240 : // started; in this case the channel will report the status of that
241 : // compaction.
242 : downloadDoneCh chan error
243 : }
244 :
245 : func (d *DB) newDownloadSpanTask(
246 : vers *manifest.Version, sp DownloadSpan,
247 1 : ) (_ *downloadSpanTask, ok bool) {
248 1 : bounds := base.UserKeyBoundsEndExclusive(sp.StartKey, sp.EndKey)
249 1 : // We are interested in all external sstables that *overlap* with
250 1 : // [sp.StartKey, sp.EndKey). Expand the bounds to the left so that we
251 1 : // include the start keys of any external sstables that overlap with
252 1 : // sp.StartKey.
253 1 : for _, ls := range vers.AllLevelsAndSublevels() {
254 1 : iter := ls.Iter()
255 1 : if f := iter.SeekGE(d.cmp, sp.StartKey); f != nil &&
256 1 : objstorage.IsExternalTable(d.objProvider, f.TableBacking.DiskFileNum) &&
257 1 : d.cmp(f.Smallest().UserKey, bounds.Start) < 0 {
258 1 : bounds.Start = f.Smallest().UserKey
259 1 : }
260 : }
261 1 : startCursor := downloadCursor{
262 1 : level: 0,
263 1 : key: bounds.Start,
264 1 : seqNum: 0,
265 1 : }
266 1 : f, level := startCursor.NextExternalFile(d.cmp, d.objProvider, bounds, vers)
267 1 : if f == nil {
268 1 : // No external files in the given span.
269 1 : return nil, false
270 1 : }
271 :
272 1 : return &downloadSpanTask{
273 1 : downloadSpan: sp,
274 1 : bounds: bounds,
275 1 : taskCompletedChan: make(chan error, 1),
276 1 : cursor: makeCursorAtFile(f, level),
277 1 : }, true
278 : }
279 :
280 : // downloadCursor represents a position in the download process, which does not
281 : // depend on a specific version.
282 : //
283 : // The Download process scans for external files level-by-level (starting with
284 : // L0), and left-to-right (in terms of Smallest.UserKey) within each level. In
285 : // L0, we break ties by the LargestSeqNum.
286 : //
287 : // A cursor can be thought of as a boundary between two files in a version
288 : // (ordered by level, then by Smallest.UserKey, then by LargestSeqNum). A file
289 : // is either "before" or "after" the cursor.
290 : type downloadCursor struct {
291 : // LSM level (0 to NumLevels). When level=NumLevels, the cursor is at the end.
292 : level int
293 : // Inclusive lower bound for Smallest.UserKey for tables on level.
294 : key []byte
295 : // Inclusive lower bound for sequence number for tables on level with
296 : // Smallest.UserKey equaling key. Used to break ties within L0, and also used
297 : // to position a cursor immediately after a given file.
298 : seqNum base.SeqNum
299 : }
300 :
301 : var endCursor = downloadCursor{level: manifest.NumLevels}
302 :
303 : // AtEnd returns true if the cursor is after all relevant files.
304 1 : func (c downloadCursor) AtEnd() bool {
305 1 : return c.level >= manifest.NumLevels
306 1 : }
307 :
308 0 : func (c downloadCursor) String() string {
309 0 : return fmt.Sprintf("level=%d key=%q seqNum=%d", c.level, c.key, c.seqNum)
310 0 : }
311 :
312 : // makeCursorAtFile returns a downloadCursor that is immediately before the
313 : // given file. Calling nextExternalFile on the resulting cursor (using the same
314 : // version) should return f.
315 1 : func makeCursorAtFile(f *manifest.TableMetadata, level int) downloadCursor {
316 1 : return downloadCursor{
317 1 : level: level,
318 1 : key: f.Smallest().UserKey,
319 1 : seqNum: f.LargestSeqNum,
320 1 : }
321 1 : }
322 :
323 : // makeCursorAfterFile returns a downloadCursor that is immediately
324 : // after the given file.
325 1 : func makeCursorAfterFile(f *manifest.TableMetadata, level int) downloadCursor {
326 1 : return downloadCursor{
327 1 : level: level,
328 1 : key: f.Smallest().UserKey,
329 1 : seqNum: f.LargestSeqNum + 1,
330 1 : }
331 1 : }
332 :
333 : func (c downloadCursor) FileIsAfterCursor(
334 : cmp base.Compare, f *manifest.TableMetadata, level int,
335 1 : ) bool {
336 1 : return c.Compare(cmp, makeCursorAfterFile(f, level)) < 0
337 1 : }
338 :
339 1 : func (c downloadCursor) Compare(keyCmp base.Compare, other downloadCursor) int {
340 1 : if c := cmp.Compare(c.level, other.level); c != 0 {
341 0 : return c
342 0 : }
343 1 : if c := keyCmp(c.key, other.key); c != 0 {
344 1 : return c
345 1 : }
346 1 : return cmp.Compare(c.seqNum, other.seqNum)
347 : }
348 :
349 : // NextExternalFile returns the first file after the cursor, returning the file
350 : // and the level. If no such file exists, returns nil fileMetadata.
351 : func (c downloadCursor) NextExternalFile(
352 : cmp base.Compare, objProvider objstorage.Provider, bounds base.UserKeyBounds, v *manifest.Version,
353 1 : ) (_ *manifest.TableMetadata, level int) {
354 1 : for !c.AtEnd() {
355 1 : if f := c.NextExternalFileOnLevel(cmp, objProvider, bounds.End, v); f != nil {
356 1 : return f, c.level
357 1 : }
358 : // Go to the next level.
359 1 : c.key = bounds.Start
360 1 : c.seqNum = 0
361 1 : c.level++
362 : }
363 1 : return nil, manifest.NumLevels
364 : }
365 :
366 : // NextExternalFileOnLevel returns the first external file on c.level which is
367 : // after c and with Smallest.UserKey within the end bound.
368 : func (c downloadCursor) NextExternalFileOnLevel(
369 : cmp base.Compare,
370 : objProvider objstorage.Provider,
371 : endBound base.UserKeyBoundary,
372 : v *manifest.Version,
373 1 : ) *manifest.TableMetadata {
374 1 : if c.level > 0 {
375 1 : it := v.Levels[c.level].Iter()
376 1 : return firstExternalFileInLevelIter(cmp, objProvider, c, it, endBound)
377 1 : }
378 : // For L0, we look at all sublevel iterators and take the first file.
379 1 : var first *manifest.TableMetadata
380 1 : var firstCursor downloadCursor
381 1 : for _, sublevel := range v.L0SublevelFiles {
382 1 : f := firstExternalFileInLevelIter(cmp, objProvider, c, sublevel.Iter(), endBound)
383 1 : if f != nil {
384 1 : c := makeCursorAtFile(f, c.level)
385 1 : if first == nil || c.Compare(cmp, firstCursor) < 0 {
386 1 : first = f
387 1 : firstCursor = c
388 1 : }
389 : // Trim the end bound as an optimization.
390 1 : endBound = base.UserKeyInclusive(f.Smallest().UserKey)
391 : }
392 : }
393 1 : return first
394 : }
395 :
396 : // firstExternalFileInLevelIter finds the first external file after the cursor
397 : // but which starts before the endBound. It is assumed that the iterator
398 : // corresponds to cursor.level.
399 : func firstExternalFileInLevelIter(
400 : cmp base.Compare,
401 : objProvider objstorage.Provider,
402 : cursor downloadCursor,
403 : it manifest.LevelIterator,
404 : endBound base.UserKeyBoundary,
405 1 : ) *manifest.TableMetadata {
406 1 : f := it.SeekGE(cmp, cursor.key)
407 1 : // Skip the file if it starts before cursor.key or is at that same key with lower
408 1 : // sequence number.
409 1 : for f != nil && !cursor.FileIsAfterCursor(cmp, f, cursor.level) {
410 1 : f = it.Next()
411 1 : }
412 1 : for ; f != nil && endBound.IsUpperBoundFor(cmp, f.Smallest().UserKey); f = it.Next() {
413 1 : if f.Virtual && objstorage.IsExternalTable(objProvider, f.TableBacking.DiskFileNum) {
414 1 : return f
415 1 : }
416 : }
417 1 : return nil
418 : }
419 :
420 : // tryLaunchDownloadForFile attempt to launch a download compaction for the
421 : // given file. Returns true on success, or false if the file is already
422 : // involved in a compaction.
423 : func (d *DB) tryLaunchDownloadForFile(
424 : vers *manifest.Version,
425 : l0Organizer *manifest.L0Organizer,
426 : env compactionEnv,
427 : download *downloadSpanTask,
428 : level int,
429 : f *manifest.TableMetadata,
430 1 : ) (doneCh chan error, ok bool) {
431 1 : if f.IsCompacting() {
432 1 : return nil, false
433 1 : }
434 1 : if download.testing.launchDownloadCompaction != nil {
435 0 : return download.testing.launchDownloadCompaction(f)
436 0 : }
437 1 : kind := compactionKindRewrite
438 1 : if download.downloadSpan.ViaBackingFileDownload {
439 1 : kind = compactionKindCopy
440 1 : }
441 1 : pc := pickDownloadCompaction(vers, l0Organizer, d.opts, env, d.mu.versions.picker.getBaseLevel(), kind, level, f)
442 1 : if pc == nil {
443 0 : // We are not able to run this download compaction at this time.
444 0 : return nil, false
445 0 : }
446 :
447 1 : download.numLaunchedDownloads++
448 1 : doneCh = make(chan error, 1)
449 1 : c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, noopGrantHandle{}, d.TableFormat(), d.determineCompactionValueSeparation)
450 1 : c.isDownload = true
451 1 : d.mu.compact.downloadingCount++
452 1 : c.AddInProgressLocked(d)
453 1 : go d.compact(c, doneCh)
454 1 : return doneCh, true
455 : }
456 :
457 : type launchDownloadResult int8
458 :
459 : const (
460 : launchedCompaction launchDownloadResult = iota
461 : didNotLaunchCompaction
462 : downloadTaskCompleted
463 : )
464 :
465 : func (d *DB) tryLaunchDownloadCompaction(
466 : download *downloadSpanTask,
467 : vers *manifest.Version,
468 : l0Organizer *manifest.L0Organizer,
469 : env compactionEnv,
470 : maxConcurrentDownloads int,
471 1 : ) launchDownloadResult {
472 1 : // First, check the bookmarks.
473 1 : for i := 0; i < len(download.bookmarks); i++ {
474 1 : b := &download.bookmarks[i]
475 1 : if b.downloadDoneCh != nil {
476 1 : // First check if the compaction we launched completed.
477 1 : select {
478 1 : case compactionErr := <-b.downloadDoneCh:
479 1 : if compactionErr != nil && !errors.Is(compactionErr, ErrCancelledCompaction) {
480 0 : download.taskCompletedChan <- compactionErr
481 0 : return downloadTaskCompleted
482 0 : }
483 1 : b.downloadDoneCh = nil
484 :
485 : // Even if the compaction finished without an error, we still want to
486 : // check the rest of the bookmark range for external files.
487 : //
488 : // For example, say that we encounter a file ["a", "f"] and start a
489 : // download (creating a bookmark). Then that file gets excised into new
490 : // files ["a", "b"] and ["e", "f"] and the excise causes the download
491 : // compaction to be cancelled. We will start another download compaction
492 : // for ["a", "c"]; once that is complete, we still need to look at the
493 : // rest of the bookmark range (i.e. up to "f") to discover the
494 : // ["e", "f"] file.
495 :
496 1 : default:
497 1 : // The compaction is still running, go to the next bookmark.
498 1 : continue
499 : }
500 : }
501 :
502 : // If downloadDoneCh was nil, we are waiting on a compaction that we did not
503 : // start. We are effectively polling the status by checking the external
504 : // files within the bookmark. This is ok because this method is called (for
505 : // this download task) at most once every time a compaction completes.
506 :
507 1 : f := b.start.NextExternalFileOnLevel(d.cmp, d.objProvider, b.endBound, vers)
508 1 : if f == nil {
509 1 : // No more external files for this bookmark, remove it.
510 1 : download.bookmarks = slices.Delete(download.bookmarks, i, i+1)
511 1 : i--
512 1 : continue
513 : }
514 :
515 : // Move up the bookmark position to point at this file.
516 1 : b.start = makeCursorAtFile(f, b.start.level)
517 1 : doneCh, ok := d.tryLaunchDownloadForFile(vers, l0Organizer, env, download, b.start.level, f)
518 1 : if ok {
519 1 : b.downloadDoneCh = doneCh
520 1 : return launchedCompaction
521 1 : }
522 : // We could not launch a download, which means the file is part of another
523 : // compaction. We leave the bookmark in place and will poll the status in
524 : // the code above.
525 : }
526 :
527 : // Try to advance the cursor and launch more downloads.
528 1 : for len(download.bookmarks) < maxConcurrentDownloads {
529 1 : f, level := download.cursor.NextExternalFile(d.cmp, d.objProvider, download.bounds, vers)
530 1 : if f == nil {
531 1 : download.cursor = endCursor
532 1 : if len(download.bookmarks) == 0 {
533 1 : download.taskCompletedChan <- nil
534 1 : return downloadTaskCompleted
535 1 : }
536 1 : return didNotLaunchCompaction
537 : }
538 1 : download.cursor = makeCursorAfterFile(f, level)
539 1 :
540 1 : download.bookmarks = append(download.bookmarks, downloadBookmark{
541 1 : start: makeCursorAtFile(f, level),
542 1 : endBound: base.UserKeyInclusive(f.Largest().UserKey),
543 1 : })
544 1 : doneCh, ok := d.tryLaunchDownloadForFile(vers, l0Organizer, env, download, level, f)
545 1 : if ok {
546 1 : // We launched a download for this file.
547 1 : download.bookmarks[len(download.bookmarks)-1].downloadDoneCh = doneCh
548 1 : return launchedCompaction
549 1 : }
550 : }
551 :
552 1 : return didNotLaunchCompaction
553 : }
|