LCOV - code coverage report
Current view: top level - pebble - download.go (source / functions) Coverage Total Hit
Test: 2025-01-28 08:16Z 11576157 - tests only.lcov Lines: 85.2 % 244 208
Test Date: 2025-01-28 08:17:11 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "context"
      10              :         "fmt"
      11              :         "slices"
      12              : 
      13              :         "github.com/cockroachdb/errors"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/manifest"
      16              :         "github.com/cockroachdb/pebble/objstorage"
      17              : )
      18              : 
      19              : // DownloadSpan is a key range passed to the Download method.
      20              : type DownloadSpan struct {
      21              :         StartKey []byte
      22              :         // EndKey is exclusive.
      23              :         EndKey []byte
      24              :         // ViaBackingFileDownload, if true, indicates the span should be downloaded by
      25              :         // downloading any remote backing files byte-for-byte and replacing them with
      26              :         // the downloaded local files, while otherwise leaving the virtual SSTables
      27              :         // as-is. If false, a "normal" rewriting compaction of the span, that iterates
      28              :         // the keys and produces a new SSTable, is used instead. Downloading raw files
      29              :         // can be faster when the whole file is being downloaded, as it avoids some
      30              :         // cpu-intensive steps involved in iteration and new file construction such as
      31              :         // compression, however it can also be wasteful when only a small portion of a
      32              :         // larger backing file is being used by a virtual file. Additionally, if the
      33              :         // virtual file has expensive read-time transformations, such as prefix
      34              :         // replacement, rewriting once can persist the result of these for future use
      35              :         // while copying only the backing file will obligate future reads to continue
      36              :         // to compute such transforms.
      37              :         ViaBackingFileDownload bool
      38              : }
      39              : 
      40              : // Download ensures that the LSM does not use any external sstables for the
      41              : // given key ranges. It does so by performing appropriate compactions so that
      42              : // all external data becomes available locally.
      43              : //
      44              : // Note that calling this method does not imply that all other compactions stop;
      45              : // it simply informs Pebble of a list of spans for which external data should be
      46              : // downloaded with high priority.
      47              : //
      48              : // The method returns once no external sstables overlap the given spans, the
      49              : // context is canceled, the db is closed, or an error is hit.
      50              : //
      51              : // Note that despite the best effort of this method, if external ingestions
      52              : // happen in parallel, a new external file can always appear right as we're
      53              : // returning.
      54              : //
      55              : // TODO(radu): consider passing a priority/impact knob to express how important
      56              : // the download is (versus live traffic performance, LSM health).
      57            1 : func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
      58            1 :         ctx, cancel := context.WithCancel(ctx)
      59            1 :         defer cancel()
      60            1 :         if err := d.closed.Load(); err != nil {
      61            0 :                 panic(err)
      62              :         }
      63            1 :         if d.opts.ReadOnly {
      64            0 :                 return ErrReadOnly
      65            0 :         }
      66            1 :         info := DownloadInfo{
      67            1 :                 JobID: int(d.newJobID()),
      68            1 :                 Spans: spans,
      69            1 :         }
      70            1 :         startTime := d.timeNow()
      71            1 :         d.opts.EventListener.DownloadBegin(info)
      72            1 : 
      73            1 :         for info.RestartCount = 0; ; info.RestartCount++ {
      74            1 :                 tasks := d.createDownloadTasks(spans)
      75            1 :                 info.Duration = d.timeNow().Sub(startTime)
      76            1 :                 if len(tasks) == 0 {
      77            1 :                         // We are done.
      78            1 :                         info.Done = true
      79            1 :                         d.opts.EventListener.DownloadEnd(info)
      80            1 :                         return nil
      81            1 :                 }
      82            1 :                 if info.RestartCount > 0 {
      83            0 :                         d.opts.EventListener.DownloadBegin(info)
      84            0 :                 }
      85              : 
      86              :                 // Install the tasks.
      87            1 :                 d.mu.Lock()
      88            1 :                 d.mu.compact.downloads = append(d.mu.compact.downloads, tasks...)
      89            1 :                 d.maybeScheduleCompaction()
      90            1 :                 d.mu.Unlock()
      91            1 : 
      92            1 :                 err := d.waitForDownloadTasks(ctx, tasks)
      93            1 :                 for _, t := range tasks {
      94            1 :                         info.DownloadCompactionsLaunched += t.numLaunchedDownloads
      95            1 :                 }
      96              : 
      97            1 :                 if err != nil {
      98            0 :                         info.Err = err
      99            0 :                         info.Duration = d.timeNow().Sub(startTime)
     100            0 :                         d.opts.EventListener.DownloadEnd(info)
     101            0 :                         return err
     102            0 :                 }
     103              :         }
     104              : }
     105              : 
     106              : // createDownloadTasks creates downloadSpanTasks for the download spans that
     107              : // overlap external files in the given version.
     108            1 : func (d *DB) createDownloadTasks(spans []DownloadSpan) []*downloadSpanTask {
     109            1 :         d.mu.Lock()
     110            1 :         vers := d.mu.versions.currentVersion()
     111            1 :         d.mu.Unlock()
     112            1 : 
     113            1 :         tasks := make([]*downloadSpanTask, 0, len(spans))
     114            1 :         for i := range spans {
     115            1 :                 task, ok := d.newDownloadSpanTask(vers, spans[i])
     116            1 :                 // If !ok, there are no external files in this span.
     117            1 :                 if ok {
     118            1 :                         tasks = append(tasks, task)
     119            1 :                 }
     120              :         }
     121            1 :         return tasks
     122              : }
     123              : 
     124              : // waitForDownloadTasks waits until all download tasks complete.
     125            1 : func (d *DB) waitForDownloadTasks(ctx context.Context, tasks []*downloadSpanTask) error {
     126            1 :         for i := range tasks {
     127            1 :                 select {
     128            0 :                 case <-ctx.Done():
     129            0 :                         d.removeDownloadTasks(tasks)
     130            0 :                         return ctx.Err()
     131              : 
     132            1 :                 case err := <-tasks[i].taskCompletedChan:
     133            1 :                         if err != nil {
     134            0 :                                 d.removeDownloadTasks(tasks)
     135            0 :                                 return err
     136            0 :                         }
     137              :                 }
     138              :         }
     139            1 :         return nil
     140              : }
     141              : 
     142              : // removeDownloadTasks removes all tasks in the given slice from
     143              : // d.mu.compact.downloads.
     144            0 : func (d *DB) removeDownloadTasks(tasks []*downloadSpanTask) {
     145            0 :         d.mu.Lock()
     146            0 :         defer d.mu.Unlock()
     147            0 :         d.mu.compact.downloads = slices.DeleteFunc(d.mu.compact.downloads, func(t *downloadSpanTask) bool {
     148            0 :                 return slices.Contains(tasks, t)
     149            0 :         })
     150              : }
     151              : 
     152              : // downloadSpanTask tracks the task of downloading external files that overlap
     153              : // with a DownloadSpan.
     154              : //
     155              : // A downloadSpanTask is spawned only if at least one overlapping external file
     156              : // is found in the current version.
     157              : //
     158              : // When a downloadSpanTask completes (i.e. taskCompletedChan is signaled)
     159              : // without an error, it is guaranteed that all external files that were
     160              : // overlapping the download span at the beginning of the task are downloaded.
     161              : //
     162              : // == Implementation ==
     163              : //
     164              : // A download span task moves through the LSM within the given bounds in
     165              : // top-down level order (L0, L1, etc.), and in Smallest.UserKey order within
     166              : // each level (and breaking ties in L0 according to LargestSeqNum). We introduce
     167              : // the concept of a "download cursor" to keep track of where we are in this
     168              : // process, in a way that is independent of any one version. A cursor stores the
     169              : // level, a user key which is a lower bound for Smallest.UserKey within that
     170              : // level, and a sequence number which is a lower bound for the LargestSeqNum for
     171              : // files on that level starting at exactly that key.
     172              : //
     173              : // While a download task is running, tables with external backings can disappear
     174              : // due to excises or compactions; tables can move *down* (to a lower LSM level);
     175              : // or tables can have their bounds shrink due to excises (and these will appear
     176              : // as new tables, even though they have the same backing). The top-down,
     177              : // left-to-right-start-key ordering ensures that we don't miss any table
     178              : // (instead, we may examine it multiple times).
     179              : //
     180              : // We use a cursor that advances as our download task makes progress. Each time
     181              : // we encounter a file that needs downloading, we create a "bookmark". A
     182              : // bookmark conceptually represents a key range within a level and it
     183              : // corresponds to the bounds of the file that we discovered. It is represented
     184              : // as a cursor position (corresponding to the start) and an end boundary key. We
     185              : // need to remember the bookmark because the download compaction can fail (e.g.
     186              : // it can get canceled by an excise) and the file might get excised so we need
     187              : // to look again at all files within the original key range.
     188              : //
     189              : // It is also possible that we encounter files that are already part of a
     190              : // compaction. These can be move compaction, or can get canceled, so we can't
     191              : // just ignore these files; we create bookmarks for such files as well.
     192              : //
     193              : // We maintain no more than maxConcurrentDownloads bookmarks - the idea being
     194              : // that files that are part of compactions are getting downloaded anyway and we
     195              : // can effectively count them toward the limit. When we cannot create any more
     196              : // bookmarks, we stop advancing the task cursor. Note that it is not this code's
     197              : // job to enforce the maximum concurrency, this is simply a reasonable limit - we
     198              : // don't want to accumulate arbitrarily many bookmarks, since we check each one
     199              : // whenever tryLaunchDownloadCompaction is called (after every compaction
     200              : // completing).
     201              : //
     202              : // This implementation achieves O(maxConcurrentDownloads * N) level iterator
     203              : // operations across the entire task, where N is the (average) number of files
     204              : // within the bounds.
     205              : type downloadSpanTask struct {
     206              :         downloadSpan DownloadSpan
     207              : 
     208              :         // The download task pertains to sstables which *start* (as per
     209              :         // Smallest.UserKey) within these bounds.
     210              :         bounds base.UserKeyBounds
     211              : 
     212              :         // taskCompletedChan is signaled when we have finished download compactions
     213              :         // for all external files encountered within the bounds, or when one of these
     214              :         // compactions reports an error (other than ErrCancelledCompaction).
     215              :         taskCompletedChan chan error
     216              : 
     217              :         numLaunchedDownloads int
     218              : 
     219              :         // Keeps track of the current position; all files up to these position were
     220              :         // examined and were either downloaded or we have bookmarks for them.
     221              :         cursor downloadCursor
     222              : 
     223              :         // Bookmarks remember areas which correspond to downloads that we started or
     224              :         // files that were undergoing other compactions and which we need to check
     225              :         // again before completing the task.
     226              :         bookmarks []downloadBookmark
     227              : 
     228              :         // Testing hooks.
     229              :         testing struct {
     230              :                 launchDownloadCompaction func(f *fileMetadata) (chan error, bool)
     231              :         }
     232              : }
     233              : 
     234              : // downloadBookmark represents an area that was swept by the task cursor which
     235              : // corresponds to a file that was part of a running compaction or download.
     236              : type downloadBookmark struct {
     237              :         start    downloadCursor
     238              :         endBound base.UserKeyBoundary
     239              :         // downloadDoneCh is set if this bookmark corresponds to a download we
     240              :         // started; in this case the channel will report the status of that
     241              :         // compaction.
     242              :         downloadDoneCh chan error
     243              : }
     244              : 
     245            1 : func (d *DB) newDownloadSpanTask(vers *version, sp DownloadSpan) (_ *downloadSpanTask, ok bool) {
     246            1 :         bounds := base.UserKeyBoundsEndExclusive(sp.StartKey, sp.EndKey)
     247            1 :         // We are interested in all external sstables that *overlap* with
     248            1 :         // [sp.StartKey, sp.EndKey). Expand the bounds to the left so that we
     249            1 :         // include the start keys of any external sstables that overlap with
     250            1 :         // sp.StartKey.
     251            1 :         vers.IterAllLevelsAndSublevels(func(iter manifest.LevelIterator, level manifest.Layer) {
     252            1 :                 if f := iter.SeekGE(d.cmp, sp.StartKey); f != nil &&
     253            1 :                         objstorage.IsExternalTable(d.objProvider, f.FileBacking.DiskFileNum) &&
     254            1 :                         d.cmp(f.Smallest.UserKey, bounds.Start) < 0 {
     255            0 :                         bounds.Start = f.Smallest.UserKey
     256            0 :                 }
     257              :         })
     258            1 :         startCursor := downloadCursor{
     259            1 :                 level:  0,
     260            1 :                 key:    bounds.Start,
     261            1 :                 seqNum: 0,
     262            1 :         }
     263            1 :         f, level := startCursor.NextExternalFile(d.cmp, d.objProvider, bounds, vers)
     264            1 :         if f == nil {
     265            1 :                 // No external files in the given span.
     266            1 :                 return nil, false
     267            1 :         }
     268              : 
     269            1 :         return &downloadSpanTask{
     270            1 :                 downloadSpan:      sp,
     271            1 :                 bounds:            bounds,
     272            1 :                 taskCompletedChan: make(chan error, 1),
     273            1 :                 cursor:            makeCursorAtFile(f, level),
     274            1 :         }, true
     275              : }
     276              : 
     277              : // downloadCursor represents a position in the download process, which does not
     278              : // depend on a specific version.
     279              : //
     280              : // The Download process scans for external files level-by-level (starting with
     281              : // L0), and left-to-right (in terms of Smallest.UserKey) within each level. In
     282              : // L0, we break ties by the LargestSeqNum.
     283              : //
     284              : // A cursor can be thought of as a boundary between two files in a version
     285              : // (ordered by level, then by Smallest.UserKey, then by LargestSeqNum). A file
     286              : // is either "before" or "after" the cursor.
     287              : type downloadCursor struct {
     288              :         // LSM level (0 to NumLevels). When level=NumLevels, the cursor is at the end.
     289              :         level int
     290              :         // Inclusive lower bound for Smallest.UserKey for tables on level.
     291              :         key []byte
     292              :         // Inclusive lower bound for sequence number for tables on level with
     293              :         // Smallest.UserKey equaling key. Used to break ties within L0, and also used
     294              :         // to position a cursor immediately after a given file.
     295              :         seqNum base.SeqNum
     296              : }
     297              : 
     298              : var endCursor = downloadCursor{level: manifest.NumLevels}
     299              : 
     300              : // AtEnd returns true if the cursor is after all relevant files.
     301            1 : func (c downloadCursor) AtEnd() bool {
     302            1 :         return c.level >= manifest.NumLevels
     303            1 : }
     304              : 
     305            1 : func (c downloadCursor) String() string {
     306            1 :         return fmt.Sprintf("level=%d key=%q seqNum=%d", c.level, c.key, c.seqNum)
     307            1 : }
     308              : 
     309              : // makeCursorAtFile returns a downloadCursor that is immediately before the
     310              : // given file. Calling nextExternalFile on the resulting cursor (using the same
     311              : // version) should return f.
     312            1 : func makeCursorAtFile(f *fileMetadata, level int) downloadCursor {
     313            1 :         return downloadCursor{
     314            1 :                 level:  level,
     315            1 :                 key:    f.Smallest.UserKey,
     316            1 :                 seqNum: f.LargestSeqNum,
     317            1 :         }
     318            1 : }
     319              : 
     320              : // makeCursorAfterFile returns a downloadCursor that is immediately
     321              : // after the given file.
     322            1 : func makeCursorAfterFile(f *fileMetadata, level int) downloadCursor {
     323            1 :         return downloadCursor{
     324            1 :                 level:  level,
     325            1 :                 key:    f.Smallest.UserKey,
     326            1 :                 seqNum: f.LargestSeqNum + 1,
     327            1 :         }
     328            1 : }
     329              : 
     330            1 : func (c downloadCursor) FileIsAfterCursor(cmp base.Compare, f *fileMetadata, level int) bool {
     331            1 :         return c.Compare(cmp, makeCursorAfterFile(f, level)) < 0
     332            1 : }
     333              : 
     334            1 : func (c downloadCursor) Compare(keyCmp base.Compare, other downloadCursor) int {
     335            1 :         if c := cmp.Compare(c.level, other.level); c != 0 {
     336            0 :                 return c
     337            0 :         }
     338            1 :         if c := keyCmp(c.key, other.key); c != 0 {
     339            1 :                 return c
     340            1 :         }
     341            1 :         return cmp.Compare(c.seqNum, other.seqNum)
     342              : }
     343              : 
     344              : // NextExternalFile returns the first file after the cursor, returning the file
     345              : // and the level. If no such file exists, returns nil fileMetadata.
     346              : func (c downloadCursor) NextExternalFile(
     347              :         cmp base.Compare, objProvider objstorage.Provider, bounds base.UserKeyBounds, v *version,
     348            1 : ) (_ *fileMetadata, level int) {
     349            1 :         for !c.AtEnd() {
     350            1 :                 if f := c.NextExternalFileOnLevel(cmp, objProvider, bounds.End, v); f != nil {
     351            1 :                         return f, c.level
     352            1 :                 }
     353              :                 // Go to the next level.
     354            1 :                 c.key = bounds.Start
     355            1 :                 c.seqNum = 0
     356            1 :                 c.level++
     357              :         }
     358            1 :         return nil, manifest.NumLevels
     359              : }
     360              : 
     361              : // NextExternalFileOnLevel returns the first external file on c.level which is
     362              : // after c and with Smallest.UserKey within the end bound.
     363              : func (c downloadCursor) NextExternalFileOnLevel(
     364              :         cmp base.Compare, objProvider objstorage.Provider, endBound base.UserKeyBoundary, v *version,
     365            1 : ) *fileMetadata {
     366            1 :         if c.level > 0 {
     367            1 :                 it := v.Levels[c.level].Iter()
     368            1 :                 return firstExternalFileInLevelIter(cmp, objProvider, c, it, endBound)
     369            1 :         }
     370              :         // For L0, we look at all sublevel iterators and take the first file.
     371            1 :         var first *fileMetadata
     372            1 :         var firstCursor downloadCursor
     373            1 :         for _, sublevel := range v.L0SublevelFiles {
     374            1 :                 f := firstExternalFileInLevelIter(cmp, objProvider, c, sublevel.Iter(), endBound)
     375            1 :                 if f != nil {
     376            1 :                         c := makeCursorAtFile(f, c.level)
     377            1 :                         if first == nil || c.Compare(cmp, firstCursor) < 0 {
     378            1 :                                 first = f
     379            1 :                                 firstCursor = c
     380            1 :                         }
     381              :                         // Trim the end bound as an optimization.
     382            1 :                         endBound = base.UserKeyInclusive(f.Smallest.UserKey)
     383              :                 }
     384              :         }
     385            1 :         return first
     386              : }
     387              : 
     388              : // firstExternalFileInLevelIter finds the first external file after the cursor
     389              : // but which starts before the endBound. It is assumed that the iterator
     390              : // corresponds to cursor.level.
     391              : func firstExternalFileInLevelIter(
     392              :         cmp base.Compare,
     393              :         objProvider objstorage.Provider,
     394              :         cursor downloadCursor,
     395              :         it manifest.LevelIterator,
     396              :         endBound base.UserKeyBoundary,
     397            1 : ) *fileMetadata {
     398            1 :         f := it.SeekGE(cmp, cursor.key)
     399            1 :         // Skip the file if it starts before cursor.key or is at that same key with lower
     400            1 :         // sequence number.
     401            1 :         for f != nil && !cursor.FileIsAfterCursor(cmp, f, cursor.level) {
     402            1 :                 f = it.Next()
     403            1 :         }
     404            1 :         for ; f != nil && endBound.IsUpperBoundFor(cmp, f.Smallest.UserKey); f = it.Next() {
     405            1 :                 if f.Virtual && objstorage.IsExternalTable(objProvider, f.FileBacking.DiskFileNum) {
     406            1 :                         return f
     407            1 :                 }
     408              :         }
     409            1 :         return nil
     410              : }
     411              : 
     412              : // tryLaunchDownloadForFile attempt to launch a download compaction for the
     413              : // given file. Returns true on success, or false if the file is already
     414              : // involved in a compaction.
     415              : func (d *DB) tryLaunchDownloadForFile(
     416              :         vers *version, env compactionEnv, download *downloadSpanTask, level int, f *fileMetadata,
     417            1 : ) (doneCh chan error, ok bool) {
     418            1 :         if f.IsCompacting() {
     419            1 :                 return nil, false
     420            1 :         }
     421            1 :         if download.testing.launchDownloadCompaction != nil {
     422            1 :                 return download.testing.launchDownloadCompaction(f)
     423            1 :         }
     424            1 :         kind := compactionKindRewrite
     425            1 :         if download.downloadSpan.ViaBackingFileDownload {
     426            1 :                 kind = compactionKindCopy
     427            1 :         }
     428            1 :         pc := pickDownloadCompaction(vers, d.opts, env, d.mu.versions.picker.getBaseLevel(), kind, level, f)
     429            1 :         if pc == nil {
     430            0 :                 // We are not able to run this download compaction at this time.
     431            0 :                 return nil, false
     432            0 :         }
     433              : 
     434            1 :         download.numLaunchedDownloads++
     435            1 :         doneCh = make(chan error, 1)
     436            1 :         c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, nil /* slot */)
     437            1 :         c.isDownload = true
     438            1 :         d.mu.compact.downloadingCount++
     439            1 :         d.addInProgressCompaction(c)
     440            1 :         go d.compact(c, doneCh)
     441            1 :         return doneCh, true
     442              : }
     443              : 
     444              : type launchDownloadResult int8
     445              : 
     446              : const (
     447              :         launchedCompaction launchDownloadResult = iota
     448              :         didNotLaunchCompaction
     449              :         downloadTaskCompleted
     450              : )
     451              : 
     452              : func (d *DB) tryLaunchDownloadCompaction(
     453              :         download *downloadSpanTask, vers *manifest.Version, env compactionEnv, maxConcurrentDownloads int,
     454            1 : ) launchDownloadResult {
     455            1 :         // First, check the bookmarks.
     456            1 :         for i := 0; i < len(download.bookmarks); i++ {
     457            1 :                 b := &download.bookmarks[i]
     458            1 :                 if b.downloadDoneCh != nil {
     459            1 :                         // First check if the compaction we launched completed.
     460            1 :                         select {
     461            1 :                         case compactionErr := <-b.downloadDoneCh:
     462            1 :                                 if compactionErr != nil && !errors.Is(compactionErr, ErrCancelledCompaction) {
     463            0 :                                         download.taskCompletedChan <- compactionErr
     464            0 :                                         return downloadTaskCompleted
     465            0 :                                 }
     466            1 :                                 b.downloadDoneCh = nil
     467              : 
     468              :                                 // Even if the compaction finished without an error, we still want to
     469              :                                 // check the rest of the bookmark range for external files.
     470              :                                 //
     471              :                                 // For example, say that we encounter a file ["a", "f"] and start a
     472              :                                 // download (creating a bookmark). Then that file gets excised into new
     473              :                                 // files ["a", "b"] and ["e", "f"] and the excise causes the download
     474              :                                 // compaction to be cancelled. We will start another download compaction
     475              :                                 // for ["a", "c"]; once that is complete, we still need to look at the
     476              :                                 // rest of the bookmark range (i.e. up to "f") to discover the
     477              :                                 // ["e", "f"] file.
     478              : 
     479            0 :                         default:
     480            0 :                                 // The compaction is still running, go to the next bookmark.
     481            0 :                                 continue
     482              :                         }
     483              :                 }
     484              : 
     485              :                 // If downloadDoneCh was nil, we are waiting on a compaction that we did not
     486              :                 // start. We are effectively polling the status by checking the external
     487              :                 // files within the bookmark. This is ok because this method is called (for
     488              :                 // this download task) at most once every time a compaction completes.
     489              : 
     490            1 :                 f := b.start.NextExternalFileOnLevel(d.cmp, d.objProvider, b.endBound, vers)
     491            1 :                 if f == nil {
     492            1 :                         // No more external files for this bookmark, remove it.
     493            1 :                         download.bookmarks = slices.Delete(download.bookmarks, i, i+1)
     494            1 :                         i--
     495            1 :                         continue
     496              :                 }
     497              : 
     498              :                 // Move up the bookmark position to point at this file.
     499            1 :                 b.start = makeCursorAtFile(f, b.start.level)
     500            1 :                 doneCh, ok := d.tryLaunchDownloadForFile(vers, env, download, b.start.level, f)
     501            1 :                 if ok {
     502            1 :                         b.downloadDoneCh = doneCh
     503            1 :                         return launchedCompaction
     504            1 :                 }
     505              :                 // We could not launch a download, which means the file is part of another
     506              :                 // compaction. We leave the bookmark in place and will poll the status in
     507              :                 // the code above.
     508              :         }
     509              : 
     510              :         // Try to advance the cursor and launch more downloads.
     511            1 :         for len(download.bookmarks) < maxConcurrentDownloads {
     512            1 :                 f, level := download.cursor.NextExternalFile(d.cmp, d.objProvider, download.bounds, vers)
     513            1 :                 if f == nil {
     514            1 :                         download.cursor = endCursor
     515            1 :                         if len(download.bookmarks) == 0 {
     516            1 :                                 download.taskCompletedChan <- nil
     517            1 :                                 return downloadTaskCompleted
     518            1 :                         }
     519            0 :                         return didNotLaunchCompaction
     520              :                 }
     521            1 :                 download.cursor = makeCursorAfterFile(f, level)
     522            1 : 
     523            1 :                 download.bookmarks = append(download.bookmarks, downloadBookmark{
     524            1 :                         start:    makeCursorAtFile(f, level),
     525            1 :                         endBound: base.UserKeyInclusive(f.Largest.UserKey),
     526            1 :                 })
     527            1 :                 doneCh, ok := d.tryLaunchDownloadForFile(vers, env, download, level, f)
     528            1 :                 if ok {
     529            1 :                         // We launched a download for this file.
     530            1 :                         download.bookmarks[len(download.bookmarks)-1].downloadDoneCh = doneCh
     531            1 :                         return launchedCompaction
     532            1 :                 }
     533              :         }
     534              : 
     535            1 :         return didNotLaunchCompaction
     536              : }
        

Generated by: LCOV version 2.0-1