LCOV - code coverage report
Current view: top level - pebble - download.go (source / functions) Coverage Total Hit
Test: 2025-07-28 08:19Z bb252436 - meta test only.lcov Lines: 85.7 % 245 210
Test Date: 2025-07-28 08:20:54 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "context"
      10              :         "fmt"
      11              :         "slices"
      12              : 
      13              :         "github.com/cockroachdb/errors"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/manifest"
      16              :         "github.com/cockroachdb/pebble/objstorage"
      17              : )
      18              : 
      19              : // DownloadSpan is a key range passed to the Download method.
      20              : type DownloadSpan struct {
      21              :         StartKey []byte
      22              :         // EndKey is exclusive.
      23              :         EndKey []byte
      24              :         // ViaBackingFileDownload, if true, indicates the span should be downloaded by
      25              :         // downloading any remote backing files byte-for-byte and replacing them with
      26              :         // the downloaded local files, while otherwise leaving the virtual SSTables
      27              :         // as-is. If false, a "normal" rewriting compaction of the span, that iterates
      28              :         // the keys and produces a new SSTable, is used instead. Downloading raw files
      29              :         // can be faster when the whole file is being downloaded, as it avoids some
      30              :         // cpu-intensive steps involved in iteration and new file construction such as
      31              :         // compression, however it can also be wasteful when only a small portion of a
      32              :         // larger backing file is being used by a virtual file. Additionally, if the
      33              :         // virtual file has expensive read-time transformations, such as prefix
      34              :         // replacement, rewriting once can persist the result of these for future use
      35              :         // while copying only the backing file will obligate future reads to continue
      36              :         // to compute such transforms.
      37              :         ViaBackingFileDownload bool
      38              : }
      39              : 
      40              : // Download ensures that the LSM does not use any external sstables for the
      41              : // given key ranges. It does so by performing appropriate compactions so that
      42              : // all external data becomes available locally.
      43              : //
      44              : // Note that calling this method does not imply that all other compactions stop;
      45              : // it simply informs Pebble of a list of spans for which external data should be
      46              : // downloaded with high priority.
      47              : //
      48              : // The method returns once no external sstables overlap the given spans, the
      49              : // context is canceled, the db is closed, or an error is hit.
      50              : //
      51              : // Note that despite the best effort of this method, if external ingestions
      52              : // happen in parallel, a new external file can always appear right as we're
      53              : // returning.
      54              : //
      55              : // TODO(radu): consider passing a priority/impact knob to express how important
      56              : // the download is (versus live traffic performance, LSM health).
      57            1 : func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
      58            1 :         ctx, cancel := context.WithCancel(ctx)
      59            1 :         defer cancel()
      60            1 :         if err := d.closed.Load(); err != nil {
      61            0 :                 panic(err)
      62              :         }
      63            1 :         if d.opts.ReadOnly {
      64            0 :                 return ErrReadOnly
      65            0 :         }
      66            1 :         info := DownloadInfo{
      67            1 :                 JobID: int(d.newJobID()),
      68            1 :                 Spans: spans,
      69            1 :         }
      70            1 :         startTime := d.timeNow()
      71            1 :         d.opts.EventListener.DownloadBegin(info)
      72            1 : 
      73            1 :         for info.RestartCount = 0; ; info.RestartCount++ {
      74            1 :                 tasks := d.createDownloadTasks(spans)
      75            1 :                 info.Duration = d.timeNow().Sub(startTime)
      76            1 :                 if len(tasks) == 0 {
      77            1 :                         // We are done.
      78            1 :                         info.Done = true
      79            1 :                         d.opts.EventListener.DownloadEnd(info)
      80            1 :                         return nil
      81            1 :                 }
      82            1 :                 if info.RestartCount > 0 {
      83            0 :                         d.opts.EventListener.DownloadBegin(info)
      84            0 :                 }
      85              : 
      86              :                 // Install the tasks.
      87            1 :                 d.mu.Lock()
      88            1 :                 d.mu.compact.downloads = append(d.mu.compact.downloads, tasks...)
      89            1 :                 d.maybeScheduleCompaction()
      90            1 :                 d.mu.Unlock()
      91            1 : 
      92            1 :                 err := d.waitForDownloadTasks(ctx, tasks)
      93            1 :                 for _, t := range tasks {
      94            1 :                         info.DownloadCompactionsLaunched += t.numLaunchedDownloads
      95            1 :                 }
      96              : 
      97            1 :                 if err != nil {
      98            0 :                         info.Err = err
      99            0 :                         info.Duration = d.timeNow().Sub(startTime)
     100            0 :                         d.opts.EventListener.DownloadEnd(info)
     101            0 :                         return err
     102            0 :                 }
     103              :         }
     104              : }
     105              : 
     106              : // createDownloadTasks creates downloadSpanTasks for the download spans that
     107              : // overlap external files in the given version.
     108            1 : func (d *DB) createDownloadTasks(spans []DownloadSpan) []*downloadSpanTask {
     109            1 :         d.mu.Lock()
     110            1 :         vers := d.mu.versions.currentVersion()
     111            1 :         d.mu.Unlock()
     112            1 : 
     113            1 :         tasks := make([]*downloadSpanTask, 0, len(spans))
     114            1 :         for i := range spans {
     115            1 :                 task, ok := d.newDownloadSpanTask(vers, spans[i])
     116            1 :                 // If !ok, there are no external files in this span.
     117            1 :                 if ok {
     118            1 :                         tasks = append(tasks, task)
     119            1 :                 }
     120              :         }
     121            1 :         return tasks
     122              : }
     123              : 
     124              : // waitForDownloadTasks waits until all download tasks complete.
     125            1 : func (d *DB) waitForDownloadTasks(ctx context.Context, tasks []*downloadSpanTask) error {
     126            1 :         for i := range tasks {
     127            1 :                 select {
     128            0 :                 case <-ctx.Done():
     129            0 :                         d.removeDownloadTasks(tasks)
     130            0 :                         return ctx.Err()
     131              : 
     132            1 :                 case err := <-tasks[i].taskCompletedChan:
     133            1 :                         if err != nil {
     134            0 :                                 d.removeDownloadTasks(tasks)
     135            0 :                                 return err
     136            0 :                         }
     137              :                 }
     138              :         }
     139            1 :         return nil
     140              : }
     141              : 
     142              : // removeDownloadTasks removes all tasks in the given slice from
     143              : // d.mu.compact.downloads.
     144            0 : func (d *DB) removeDownloadTasks(tasks []*downloadSpanTask) {
     145            0 :         d.mu.Lock()
     146            0 :         defer d.mu.Unlock()
     147            0 :         d.mu.compact.downloads = slices.DeleteFunc(d.mu.compact.downloads, func(t *downloadSpanTask) bool {
     148            0 :                 return slices.Contains(tasks, t)
     149            0 :         })
     150              : }
     151              : 
     152              : // downloadSpanTask tracks the task of downloading external files that overlap
     153              : // with a DownloadSpan.
     154              : //
     155              : // A downloadSpanTask is spawned only if at least one overlapping external file
     156              : // is found in the current version.
     157              : //
     158              : // When a downloadSpanTask completes (i.e. taskCompletedChan is signaled)
     159              : // without an error, it is guaranteed that all external files that were
     160              : // overlapping the download span at the beginning of the task are downloaded.
     161              : //
     162              : // == Implementation ==
     163              : //
     164              : // A download span task moves through the LSM within the given bounds in
     165              : // top-down level order (L0, L1, etc.), and in Smallest.UserKey order within
     166              : // each level (and breaking ties in L0 according to LargestSeqNum). We introduce
     167              : // the concept of a "download cursor" to keep track of where we are in this
     168              : // process, in a way that is independent of any one version. A cursor stores the
     169              : // level, a user key which is a lower bound for Smallest.UserKey within that
     170              : // level, and a sequence number which is a lower bound for the LargestSeqNum for
     171              : // files on that level starting at exactly that key.
     172              : //
     173              : // While a download task is running, tables with external backings can disappear
     174              : // due to excises or compactions; tables can move *down* (to a lower LSM level);
     175              : // or tables can have their bounds shrink due to excises (and these will appear
     176              : // as new tables, even though they have the same backing). The top-down,
     177              : // left-to-right-start-key ordering ensures that we don't miss any table
     178              : // (instead, we may examine it multiple times).
     179              : //
     180              : // We use a cursor that advances as our download task makes progress. Each time
     181              : // we encounter a file that needs downloading, we create a "bookmark". A
     182              : // bookmark conceptually represents a key range within a level and it
     183              : // corresponds to the bounds of the file that we discovered. It is represented
     184              : // as a cursor position (corresponding to the start) and an end boundary key. We
     185              : // need to remember the bookmark because the download compaction can fail (e.g.
     186              : // it can get canceled by an excise) and the file might get excised so we need
     187              : // to look again at all files within the original key range.
     188              : //
     189              : // It is also possible that we encounter files that are already part of a
     190              : // compaction. These can be move compaction, or can get canceled, so we can't
     191              : // just ignore these files; we create bookmarks for such files as well.
     192              : //
     193              : // We maintain no more than maxConcurrentDownloads bookmarks - the idea being
     194              : // that files that are part of compactions are getting downloaded anyway and we
     195              : // can effectively count them toward the limit. When we cannot create any more
     196              : // bookmarks, we stop advancing the task cursor. Note that it is not this code's
     197              : // job to enforce the maximum concurrency, this is simply a reasonable limit - we
     198              : // don't want to accumulate arbitrarily many bookmarks, since we check each one
     199              : // whenever tryLaunchDownloadCompaction is called (after every compaction
     200              : // completing).
     201              : //
     202              : // This implementation achieves O(maxConcurrentDownloads * N) level iterator
     203              : // operations across the entire task, where N is the (average) number of files
     204              : // within the bounds.
     205              : type downloadSpanTask struct {
     206              :         downloadSpan DownloadSpan
     207              : 
     208              :         // The download task pertains to sstables which *start* (as per
     209              :         // Smallest.UserKey) within these bounds.
     210              :         bounds base.UserKeyBounds
     211              : 
     212              :         // taskCompletedChan is signaled when we have finished download compactions
     213              :         // for all external files encountered within the bounds, or when one of these
     214              :         // compactions reports an error (other than ErrCancelledCompaction).
     215              :         taskCompletedChan chan error
     216              : 
     217              :         numLaunchedDownloads int
     218              : 
     219              :         // Keeps track of the current position; all files up to these position were
     220              :         // examined and were either downloaded or we have bookmarks for them.
     221              :         cursor downloadCursor
     222              : 
     223              :         // Bookmarks remember areas which correspond to downloads that we started or
     224              :         // files that were undergoing other compactions and which we need to check
     225              :         // again before completing the task.
     226              :         bookmarks []downloadBookmark
     227              : 
     228              :         // Testing hooks.
     229              :         testing struct {
     230              :                 launchDownloadCompaction func(f *manifest.TableMetadata) (chan error, bool)
     231              :         }
     232              : }
     233              : 
     234              : // downloadBookmark represents an area that was swept by the task cursor which
     235              : // corresponds to a file that was part of a running compaction or download.
     236              : type downloadBookmark struct {
     237              :         start    downloadCursor
     238              :         endBound base.UserKeyBoundary
     239              :         // downloadDoneCh is set if this bookmark corresponds to a download we
     240              :         // started; in this case the channel will report the status of that
     241              :         // compaction.
     242              :         downloadDoneCh chan error
     243              : }
     244              : 
     245              : func (d *DB) newDownloadSpanTask(
     246              :         vers *manifest.Version, sp DownloadSpan,
     247            1 : ) (_ *downloadSpanTask, ok bool) {
     248            1 :         bounds := base.UserKeyBoundsEndExclusive(sp.StartKey, sp.EndKey)
     249            1 :         // We are interested in all external sstables that *overlap* with
     250            1 :         // [sp.StartKey, sp.EndKey). Expand the bounds to the left so that we
     251            1 :         // include the start keys of any external sstables that overlap with
     252            1 :         // sp.StartKey.
     253            1 :         for _, ls := range vers.AllLevelsAndSublevels() {
     254            1 :                 iter := ls.Iter()
     255            1 :                 if f := iter.SeekGE(d.cmp, sp.StartKey); f != nil &&
     256            1 :                         objstorage.IsExternalTable(d.objProvider, f.TableBacking.DiskFileNum) &&
     257            1 :                         d.cmp(f.Smallest().UserKey, bounds.Start) < 0 {
     258            1 :                         bounds.Start = f.Smallest().UserKey
     259            1 :                 }
     260              :         }
     261            1 :         startCursor := downloadCursor{
     262            1 :                 level:  0,
     263            1 :                 key:    bounds.Start,
     264            1 :                 seqNum: 0,
     265            1 :         }
     266            1 :         f, level := startCursor.NextExternalFile(d.cmp, d.objProvider, bounds, vers)
     267            1 :         if f == nil {
     268            1 :                 // No external files in the given span.
     269            1 :                 return nil, false
     270            1 :         }
     271              : 
     272            1 :         return &downloadSpanTask{
     273            1 :                 downloadSpan:      sp,
     274            1 :                 bounds:            bounds,
     275            1 :                 taskCompletedChan: make(chan error, 1),
     276            1 :                 cursor:            makeCursorAtFile(f, level),
     277            1 :         }, true
     278              : }
     279              : 
     280              : // downloadCursor represents a position in the download process, which does not
     281              : // depend on a specific version.
     282              : //
     283              : // The Download process scans for external files level-by-level (starting with
     284              : // L0), and left-to-right (in terms of Smallest.UserKey) within each level. In
     285              : // L0, we break ties by the LargestSeqNum.
     286              : //
     287              : // A cursor can be thought of as a boundary between two files in a version
     288              : // (ordered by level, then by Smallest.UserKey, then by LargestSeqNum). A file
     289              : // is either "before" or "after" the cursor.
     290              : type downloadCursor struct {
     291              :         // LSM level (0 to NumLevels). When level=NumLevels, the cursor is at the end.
     292              :         level int
     293              :         // Inclusive lower bound for Smallest.UserKey for tables on level.
     294              :         key []byte
     295              :         // Inclusive lower bound for sequence number for tables on level with
     296              :         // Smallest.UserKey equaling key. Used to break ties within L0, and also used
     297              :         // to position a cursor immediately after a given file.
     298              :         seqNum base.SeqNum
     299              : }
     300              : 
     301              : var endCursor = downloadCursor{level: manifest.NumLevels}
     302              : 
     303              : // AtEnd returns true if the cursor is after all relevant files.
     304            1 : func (c downloadCursor) AtEnd() bool {
     305            1 :         return c.level >= manifest.NumLevels
     306            1 : }
     307              : 
     308            0 : func (c downloadCursor) String() string {
     309            0 :         return fmt.Sprintf("level=%d key=%q seqNum=%d", c.level, c.key, c.seqNum)
     310            0 : }
     311              : 
     312              : // makeCursorAtFile returns a downloadCursor that is immediately before the
     313              : // given file. Calling nextExternalFile on the resulting cursor (using the same
     314              : // version) should return f.
     315            1 : func makeCursorAtFile(f *manifest.TableMetadata, level int) downloadCursor {
     316            1 :         return downloadCursor{
     317            1 :                 level:  level,
     318            1 :                 key:    f.Smallest().UserKey,
     319            1 :                 seqNum: f.LargestSeqNum,
     320            1 :         }
     321            1 : }
     322              : 
     323              : // makeCursorAfterFile returns a downloadCursor that is immediately
     324              : // after the given file.
     325            1 : func makeCursorAfterFile(f *manifest.TableMetadata, level int) downloadCursor {
     326            1 :         return downloadCursor{
     327            1 :                 level:  level,
     328            1 :                 key:    f.Smallest().UserKey,
     329            1 :                 seqNum: f.LargestSeqNum + 1,
     330            1 :         }
     331            1 : }
     332              : 
     333              : func (c downloadCursor) FileIsAfterCursor(
     334              :         cmp base.Compare, f *manifest.TableMetadata, level int,
     335            1 : ) bool {
     336            1 :         return c.Compare(cmp, makeCursorAfterFile(f, level)) < 0
     337            1 : }
     338              : 
     339            1 : func (c downloadCursor) Compare(keyCmp base.Compare, other downloadCursor) int {
     340            1 :         if c := cmp.Compare(c.level, other.level); c != 0 {
     341            0 :                 return c
     342            0 :         }
     343            1 :         if c := keyCmp(c.key, other.key); c != 0 {
     344            1 :                 return c
     345            1 :         }
     346            1 :         return cmp.Compare(c.seqNum, other.seqNum)
     347              : }
     348              : 
     349              : // NextExternalFile returns the first file after the cursor, returning the file
     350              : // and the level. If no such file exists, returns nil fileMetadata.
     351              : func (c downloadCursor) NextExternalFile(
     352              :         cmp base.Compare, objProvider objstorage.Provider, bounds base.UserKeyBounds, v *manifest.Version,
     353            1 : ) (_ *manifest.TableMetadata, level int) {
     354            1 :         for !c.AtEnd() {
     355            1 :                 if f := c.NextExternalFileOnLevel(cmp, objProvider, bounds.End, v); f != nil {
     356            1 :                         return f, c.level
     357            1 :                 }
     358              :                 // Go to the next level.
     359            1 :                 c.key = bounds.Start
     360            1 :                 c.seqNum = 0
     361            1 :                 c.level++
     362              :         }
     363            1 :         return nil, manifest.NumLevels
     364              : }
     365              : 
     366              : // NextExternalFileOnLevel returns the first external file on c.level which is
     367              : // after c and with Smallest.UserKey within the end bound.
     368              : func (c downloadCursor) NextExternalFileOnLevel(
     369              :         cmp base.Compare,
     370              :         objProvider objstorage.Provider,
     371              :         endBound base.UserKeyBoundary,
     372              :         v *manifest.Version,
     373            1 : ) *manifest.TableMetadata {
     374            1 :         if c.level > 0 {
     375            1 :                 it := v.Levels[c.level].Iter()
     376            1 :                 return firstExternalFileInLevelIter(cmp, objProvider, c, it, endBound)
     377            1 :         }
     378              :         // For L0, we look at all sublevel iterators and take the first file.
     379            1 :         var first *manifest.TableMetadata
     380            1 :         var firstCursor downloadCursor
     381            1 :         for _, sublevel := range v.L0SublevelFiles {
     382            1 :                 f := firstExternalFileInLevelIter(cmp, objProvider, c, sublevel.Iter(), endBound)
     383            1 :                 if f != nil {
     384            1 :                         c := makeCursorAtFile(f, c.level)
     385            1 :                         if first == nil || c.Compare(cmp, firstCursor) < 0 {
     386            1 :                                 first = f
     387            1 :                                 firstCursor = c
     388            1 :                         }
     389              :                         // Trim the end bound as an optimization.
     390            1 :                         endBound = base.UserKeyInclusive(f.Smallest().UserKey)
     391              :                 }
     392              :         }
     393            1 :         return first
     394              : }
     395              : 
     396              : // firstExternalFileInLevelIter finds the first external file after the cursor
     397              : // but which starts before the endBound. It is assumed that the iterator
     398              : // corresponds to cursor.level.
     399              : func firstExternalFileInLevelIter(
     400              :         cmp base.Compare,
     401              :         objProvider objstorage.Provider,
     402              :         cursor downloadCursor,
     403              :         it manifest.LevelIterator,
     404              :         endBound base.UserKeyBoundary,
     405            1 : ) *manifest.TableMetadata {
     406            1 :         f := it.SeekGE(cmp, cursor.key)
     407            1 :         // Skip the file if it starts before cursor.key or is at that same key with lower
     408            1 :         // sequence number.
     409            1 :         for f != nil && !cursor.FileIsAfterCursor(cmp, f, cursor.level) {
     410            1 :                 f = it.Next()
     411            1 :         }
     412            1 :         for ; f != nil && endBound.IsUpperBoundFor(cmp, f.Smallest().UserKey); f = it.Next() {
     413            1 :                 if f.Virtual && objstorage.IsExternalTable(objProvider, f.TableBacking.DiskFileNum) {
     414            1 :                         return f
     415            1 :                 }
     416              :         }
     417            1 :         return nil
     418              : }
     419              : 
     420              : // tryLaunchDownloadForFile attempt to launch a download compaction for the
     421              : // given file. Returns true on success, or false if the file is already
     422              : // involved in a compaction.
     423              : func (d *DB) tryLaunchDownloadForFile(
     424              :         vers *manifest.Version,
     425              :         l0Organizer *manifest.L0Organizer,
     426              :         env compactionEnv,
     427              :         download *downloadSpanTask,
     428              :         level int,
     429              :         f *manifest.TableMetadata,
     430            1 : ) (doneCh chan error, ok bool) {
     431            1 :         if f.IsCompacting() {
     432            1 :                 return nil, false
     433            1 :         }
     434            1 :         if download.testing.launchDownloadCompaction != nil {
     435            0 :                 return download.testing.launchDownloadCompaction(f)
     436            0 :         }
     437            1 :         kind := compactionKindRewrite
     438            1 :         if download.downloadSpan.ViaBackingFileDownload {
     439            1 :                 kind = compactionKindCopy
     440            1 :         }
     441            1 :         pc := pickDownloadCompaction(vers, l0Organizer, d.opts, env, d.mu.versions.picker.getBaseLevel(), kind, level, f)
     442            1 :         if pc == nil {
     443            0 :                 // We are not able to run this download compaction at this time.
     444            0 :                 return nil, false
     445            0 :         }
     446              : 
     447            1 :         download.numLaunchedDownloads++
     448            1 :         doneCh = make(chan error, 1)
     449            1 :         c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, noopGrantHandle{}, d.TableFormat(), d.determineCompactionValueSeparation)
     450            1 :         c.isDownload = true
     451            1 :         d.mu.compact.downloadingCount++
     452            1 :         c.AddInProgressLocked(d)
     453            1 :         go d.compact(c, doneCh)
     454            1 :         return doneCh, true
     455              : }
     456              : 
     457              : type launchDownloadResult int8
     458              : 
     459              : const (
     460              :         launchedCompaction launchDownloadResult = iota
     461              :         didNotLaunchCompaction
     462              :         downloadTaskCompleted
     463              : )
     464              : 
     465              : func (d *DB) tryLaunchDownloadCompaction(
     466              :         download *downloadSpanTask,
     467              :         vers *manifest.Version,
     468              :         l0Organizer *manifest.L0Organizer,
     469              :         env compactionEnv,
     470              :         maxConcurrentDownloads int,
     471            1 : ) launchDownloadResult {
     472            1 :         // First, check the bookmarks.
     473            1 :         for i := 0; i < len(download.bookmarks); i++ {
     474            1 :                 b := &download.bookmarks[i]
     475            1 :                 if b.downloadDoneCh != nil {
     476            1 :                         // First check if the compaction we launched completed.
     477            1 :                         select {
     478            1 :                         case compactionErr := <-b.downloadDoneCh:
     479            1 :                                 if compactionErr != nil && !errors.Is(compactionErr, ErrCancelledCompaction) {
     480            0 :                                         download.taskCompletedChan <- compactionErr
     481            0 :                                         return downloadTaskCompleted
     482            0 :                                 }
     483            1 :                                 b.downloadDoneCh = nil
     484              : 
     485              :                                 // Even if the compaction finished without an error, we still want to
     486              :                                 // check the rest of the bookmark range for external files.
     487              :                                 //
     488              :                                 // For example, say that we encounter a file ["a", "f"] and start a
     489              :                                 // download (creating a bookmark). Then that file gets excised into new
     490              :                                 // files ["a", "b"] and ["e", "f"] and the excise causes the download
     491              :                                 // compaction to be cancelled. We will start another download compaction
     492              :                                 // for ["a", "c"]; once that is complete, we still need to look at the
     493              :                                 // rest of the bookmark range (i.e. up to "f") to discover the
     494              :                                 // ["e", "f"] file.
     495              : 
     496            1 :                         default:
     497            1 :                                 // The compaction is still running, go to the next bookmark.
     498            1 :                                 continue
     499              :                         }
     500              :                 }
     501              : 
     502              :                 // If downloadDoneCh was nil, we are waiting on a compaction that we did not
     503              :                 // start. We are effectively polling the status by checking the external
     504              :                 // files within the bookmark. This is ok because this method is called (for
     505              :                 // this download task) at most once every time a compaction completes.
     506              : 
     507            1 :                 f := b.start.NextExternalFileOnLevel(d.cmp, d.objProvider, b.endBound, vers)
     508            1 :                 if f == nil {
     509            1 :                         // No more external files for this bookmark, remove it.
     510            1 :                         download.bookmarks = slices.Delete(download.bookmarks, i, i+1)
     511            1 :                         i--
     512            1 :                         continue
     513              :                 }
     514              : 
     515              :                 // Move up the bookmark position to point at this file.
     516            1 :                 b.start = makeCursorAtFile(f, b.start.level)
     517            1 :                 doneCh, ok := d.tryLaunchDownloadForFile(vers, l0Organizer, env, download, b.start.level, f)
     518            1 :                 if ok {
     519            1 :                         b.downloadDoneCh = doneCh
     520            1 :                         return launchedCompaction
     521            1 :                 }
     522              :                 // We could not launch a download, which means the file is part of another
     523              :                 // compaction. We leave the bookmark in place and will poll the status in
     524              :                 // the code above.
     525              :         }
     526              : 
     527              :         // Try to advance the cursor and launch more downloads.
     528            1 :         for len(download.bookmarks) < maxConcurrentDownloads {
     529            1 :                 f, level := download.cursor.NextExternalFile(d.cmp, d.objProvider, download.bounds, vers)
     530            1 :                 if f == nil {
     531            1 :                         download.cursor = endCursor
     532            1 :                         if len(download.bookmarks) == 0 {
     533            1 :                                 download.taskCompletedChan <- nil
     534            1 :                                 return downloadTaskCompleted
     535            1 :                         }
     536            1 :                         return didNotLaunchCompaction
     537              :                 }
     538            1 :                 download.cursor = makeCursorAfterFile(f, level)
     539            1 : 
     540            1 :                 download.bookmarks = append(download.bookmarks, downloadBookmark{
     541            1 :                         start:    makeCursorAtFile(f, level),
     542            1 :                         endBound: base.UserKeyInclusive(f.Largest().UserKey),
     543            1 :                 })
     544            1 :                 doneCh, ok := d.tryLaunchDownloadForFile(vers, l0Organizer, env, download, level, f)
     545            1 :                 if ok {
     546            1 :                         // We launched a download for this file.
     547            1 :                         download.bookmarks[len(download.bookmarks)-1].downloadDoneCh = doneCh
     548            1 :                         return launchedCompaction
     549            1 :                 }
     550              :         }
     551              : 
     552            1 :         return didNotLaunchCompaction
     553              : }
        

Generated by: LCOV version 2.0-1