LCOV - code coverage report
Current view: top level - pebble - download.go (source / functions) Hit Total Coverage
Test: 2024-06-08 08:15Z 47da75f0 - tests only.lcov Lines: 214 244 87.7 %
Date: 2024-06-08 08:16:00 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "fmt"
      11             :         "slices"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/manifest"
      16             :         "github.com/cockroachdb/pebble/objstorage"
      17             : )
      18             : 
      19             : // DownloadSpan is a key range passed to the Download method.
      20             : type DownloadSpan struct {
      21             :         StartKey []byte
      22             :         // EndKey is exclusive.
      23             :         EndKey []byte
      24             :         // ViaBackingFileDownload, if true, indicates the span should be downloaded by
      25             :         // downloading any remote backing files byte-for-byte and replacing them with
      26             :         // the downloaded local files, while otherwise leaving the virtual SSTables
      27             :         // as-is. If false, a "normal" rewriting compaction of the span, that iterates
      28             :         // the keys and produces a new SSTable, is used instead. Downloading raw files
      29             :         // can be faster when the whole file is being downloaded, as it avoids some
      30             :         // cpu-intensive steps involved in iteration and new file construction such as
      31             :         // compression, however it can also be wasteful when only a small portion of a
      32             :         // larger backing file is being used by a virtual file. Additionally, if the
      33             :         // virtual file has expensive read-time transformations, such as prefix
      34             :         // replacement, rewriting once can persist the result of these for future use
      35             :         // while copying only the backing file will obligate future reads to continue
      36             :         // to compute such transforms.
      37             :         ViaBackingFileDownload bool
      38             : }
      39             : 
      40             : // Download ensures that the LSM does not use any external sstables for the
      41             : // given key ranges. It does so by performing appropriate compactions so that
      42             : // all external data becomes available locally.
      43             : //
      44             : // Note that calling this method does not imply that all other compactions stop;
      45             : // it simply informs Pebble of a list of spans for which external data should be
      46             : // downloaded with high priority.
      47             : //
      48             : // The method returns once no external sstables overlap the given spans, the
      49             : // context is canceled, the db is closed, or an error is hit.
      50             : //
      51             : // Note that despite the best effort of this method, if external ingestions
      52             : // happen in parallel, a new external file can always appear right as we're
      53             : // returning.
      54             : //
      55             : // TODO(radu): consider passing a priority/impact knob to express how important
      56             : // the download is (versus live traffic performance, LSM health).
      57           1 : func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
      58           1 :         ctx, cancel := context.WithCancel(ctx)
      59           1 :         defer cancel()
      60           1 :         if err := d.closed.Load(); err != nil {
      61           0 :                 panic(err)
      62             :         }
      63           1 :         if d.opts.ReadOnly {
      64           0 :                 return ErrReadOnly
      65           0 :         }
      66           1 :         info := DownloadInfo{
      67           1 :                 JobID: int(d.newJobID()),
      68           1 :                 Spans: spans,
      69           1 :         }
      70           1 :         startTime := d.timeNow()
      71           1 :         d.opts.EventListener.DownloadBegin(info)
      72           1 : 
      73           1 :         for info.RestartCount = 0; ; info.RestartCount++ {
      74           1 :                 tasks := d.createDownloadTasks(spans)
      75           1 :                 info.Duration = d.timeNow().Sub(startTime)
      76           1 :                 if len(tasks) == 0 {
      77           1 :                         // We are done.
      78           1 :                         info.Done = true
      79           1 :                         d.opts.EventListener.DownloadEnd(info)
      80           1 :                         return nil
      81           1 :                 }
      82           1 :                 if info.RestartCount > 0 {
      83           0 :                         d.opts.EventListener.DownloadBegin(info)
      84           0 :                 }
      85             : 
      86             :                 // Install the tasks.
      87           1 :                 d.mu.Lock()
      88           1 :                 d.mu.compact.downloads = append(d.mu.compact.downloads, tasks...)
      89           1 :                 d.maybeScheduleCompaction()
      90           1 :                 d.mu.Unlock()
      91           1 : 
      92           1 :                 err := d.waitForDownloadTasks(ctx, tasks)
      93           1 :                 for _, t := range tasks {
      94           1 :                         info.DownloadCompactionsLaunched += t.numLaunchedDownloads
      95           1 :                 }
      96             : 
      97           1 :                 if err != nil {
      98           0 :                         info.Err = err
      99           0 :                         info.Duration = d.timeNow().Sub(startTime)
     100           0 :                         d.opts.EventListener.DownloadEnd(info)
     101           0 :                         return err
     102           0 :                 }
     103             :         }
     104             : }
     105             : 
     106             : // createDownloadTasks creates downloadSpanTasks for the download spans that
     107             : // overlap external files in the given version.
     108           1 : func (d *DB) createDownloadTasks(spans []DownloadSpan) []*downloadSpanTask {
     109           1 :         d.mu.Lock()
     110           1 :         vers := d.mu.versions.currentVersion()
     111           1 :         d.mu.Unlock()
     112           1 : 
     113           1 :         tasks := make([]*downloadSpanTask, 0, len(spans))
     114           1 :         for i := range spans {
     115           1 :                 task, ok := d.newDownloadSpanTask(vers, spans[i])
     116           1 :                 // If !ok, there are no external files in this span.
     117           1 :                 if ok {
     118           1 :                         tasks = append(tasks, task)
     119           1 :                 }
     120             :         }
     121           1 :         return tasks
     122             : }
     123             : 
     124             : // waitForDownloadTasks waits until all download tasks complete.
     125           1 : func (d *DB) waitForDownloadTasks(ctx context.Context, tasks []*downloadSpanTask) error {
     126           1 :         for i := range tasks {
     127           1 :                 select {
     128           0 :                 case <-ctx.Done():
     129           0 :                         d.removeDownloadTasks(tasks)
     130           0 :                         return ctx.Err()
     131             : 
     132           1 :                 case err := <-tasks[i].taskCompletedChan:
     133           1 :                         if err != nil {
     134           0 :                                 d.removeDownloadTasks(tasks)
     135           0 :                                 return err
     136           0 :                         }
     137             :                 }
     138             :         }
     139           1 :         return nil
     140             : }
     141             : 
     142             : // removeDownloadTasks removes all tasks in the given slice from
     143             : // d.mu.compact.downloads.
     144           0 : func (d *DB) removeDownloadTasks(tasks []*downloadSpanTask) {
     145           0 :         d.mu.Lock()
     146           0 :         defer d.mu.Unlock()
     147           0 :         d.mu.compact.downloads = slices.DeleteFunc(d.mu.compact.downloads, func(t *downloadSpanTask) bool {
     148           0 :                 return slices.Contains(tasks, t)
     149           0 :         })
     150             : }
     151             : 
     152             : // downloadSpanTask tracks the task of downloading external files that overlap
     153             : // with a DownloadSpan.
     154             : //
     155             : // A downloadSpanTask is spawned only if at least one overlapping external file
     156             : // is found in the current version.
     157             : //
     158             : // When a downloadSpanTask completes (i.e. taskCompletedChan is signaled)
     159             : // without an error, it is guaranteed that all external files that were
     160             : // overlapping the download span at the beginning of the task are downloaded.
     161             : //
     162             : // == Implementation ==
     163             : //
     164             : // A download span task moves through the LSM within the given bounds in
     165             : // top-down level order (L0, L1, etc.), and in Smallest.UserKey order within
     166             : // each level (and breaking ties in L0 according to LargestSeqNum). We introduce
     167             : // the concept of a "download cursor" to keep track of where we are in this
     168             : // process, in a way that is independent of any one version. A cursor stores the
     169             : // level, a user key which is a lower bound for Smallest.UserKey within that
     170             : // level, and a sequence number which is a lower bound for the LargestSeqNum for
     171             : // files on that level starting at exactly that key.
     172             : //
     173             : // While a download task is running, tables with external backings can disappear
     174             : // due to excises or compactions; tables can move *down* (to a lower LSM level);
     175             : // or tables can have their bounds shrink due to excises (and these will appear
     176             : // as new tables, even though they have the same backing). The top-down,
     177             : // left-to-right-start-key ordering ensures that we don't miss any table
     178             : // (instead, we may examine it multiple times).
     179             : //
     180             : // We use a cursor that advances as our download task makes progress. Each time
     181             : // we encounter a file that needs downloading, we create a "bookmark". A
     182             : // bookmark conceptually represents a key range within a level and it
     183             : // corresponds to the bounds of the file that we discovered. It is represented
     184             : // as a cursor position (corresponding to the start) and an end boundary key. We
     185             : // need to remember the bookmark because the download compaction can fail (e.g.
     186             : // it can get canceled by an excise) and the file might get excised so we need
     187             : // to look again at all files within the original key range.
     188             : //
     189             : // It is also possible that we encounter files that are already part of a
     190             : // compaction. These can be move compaction, or can get canceled, so we can't
     191             : // just ignore these files; we create bookmarks for such files as well.
     192             : //
     193             : // We maintain no more than maxConcurrentDownloads bookmarks - the idea being
     194             : // that files that are part of compactions are getting downloaded anyway and we
     195             : // can effectively count them toward the limit. When we cannot create any more
     196             : // bookmarks, we stop advancing the task cursor. Note that it is not this code's
     197             : // job to enforce the maximum concurrency, this is simply a reasonable limit - we
     198             : // don't want to accumulate arbitrarily many bookmarks, since we check each one
     199             : // whenever tryLaunchDownloadCompaction is called (after every compaction
     200             : // completing).
     201             : //
     202             : // This implementation achieves O(maxConcurrentDownloads * N) level iterator
     203             : // operations across the entire task, where N is the (average) number of files
     204             : // within the bounds.
     205             : type downloadSpanTask struct {
     206             :         downloadSpan DownloadSpan
     207             : 
     208             :         // The download task pertains to sstables which *start* (as per
     209             :         // Smallest.UserKey) within these bounds.
     210             :         bounds base.UserKeyBounds
     211             : 
     212             :         // taskCompletedChan is signaled when we have finished download compactions
     213             :         // for all external files encountered within the bounds, or when one of these
     214             :         // compactions reports an error (other than ErrCancelledCompaction).
     215             :         taskCompletedChan chan error
     216             : 
     217             :         numLaunchedDownloads int
     218             : 
     219             :         // Keeps track of the current position; all files up to these position were
     220             :         // examined and were either downloaded or we have bookmarks for them.
     221             :         cursor downloadCursor
     222             : 
     223             :         // Bookmarks remember areas which correspond to downloads that we started or
     224             :         // files that were undergoing other compactions and which we need to check
     225             :         // again before completing the task.
     226             :         bookmarks []downloadBookmark
     227             : 
     228             :         // Testing hooks.
     229             :         testing struct {
     230             :                 launchDownloadCompaction func(f *fileMetadata) (chan error, bool)
     231             :         }
     232             : }
     233             : 
     234             : // downloadBookmark represents an area that was swept by the task cursor which
     235             : // corresponds to a file that was part of a running compaction or download.
     236             : type downloadBookmark struct {
     237             :         start    downloadCursor
     238             :         endBound base.UserKeyBoundary
     239             :         // downloadDoneCh is set if this bookmark corresponds to a download we
     240             :         // started; in this case the channel will report the status of that
     241             :         // compaction.
     242             :         downloadDoneCh chan error
     243             : }
     244             : 
     245           1 : func (d *DB) newDownloadSpanTask(vers *version, sp DownloadSpan) (_ *downloadSpanTask, ok bool) {
     246           1 :         bounds := base.UserKeyBoundsEndExclusive(sp.StartKey, sp.EndKey)
     247           1 :         // We are interested in all external sstables that *overlap* with
     248           1 :         // [sp.StartKey, sp.EndKey). Expand the bounds to the left so that we
     249           1 :         // include the start keys of any external sstables that overlap with
     250           1 :         // sp.StartKey.
     251           1 :         vers.IterAllLevelsAndSublevels(func(iter manifest.LevelIterator, level, sublevel int) {
     252           1 :                 if f := iter.SeekGE(d.cmp, sp.StartKey); f != nil &&
     253           1 :                         objstorage.IsExternalTable(d.objProvider, f.FileBacking.DiskFileNum) &&
     254           1 :                         d.cmp(f.Smallest.UserKey, bounds.Start) < 0 {
     255           1 :                         bounds.Start = f.Smallest.UserKey
     256           1 :                 }
     257             :         })
     258           1 :         startCursor := downloadCursor{
     259           1 :                 level:  0,
     260           1 :                 key:    bounds.Start,
     261           1 :                 seqNum: 0,
     262           1 :         }
     263           1 :         f, level := startCursor.NextExternalFile(d.cmp, d.objProvider, bounds, vers)
     264           1 :         if f == nil {
     265           1 :                 // No external files in the given span.
     266           1 :                 return nil, false
     267           1 :         }
     268             : 
     269           1 :         return &downloadSpanTask{
     270           1 :                 downloadSpan:      sp,
     271           1 :                 bounds:            bounds,
     272           1 :                 taskCompletedChan: make(chan error, 1),
     273           1 :                 cursor:            makeCursorAtFile(f, level),
     274           1 :         }, true
     275             : }
     276             : 
     277             : // downloadCursor represents a position in the download process, which does not
     278             : // depend on a specific version.
     279             : //
     280             : // The Download process scans for external files level-by-level (starting with
     281             : // L0), and left-to-right (in terms of Smallest.UserKey) within each level. In
     282             : // L0, we break ties by the LargestSeqNum.
     283             : //
     284             : // A cursor can be thought of as a boundary between two files in a version
     285             : // (ordered by level, then by Smallest.UserKey, then by LargestSeqNum). A file
     286             : // is either "before" or "after" the cursor.
     287             : type downloadCursor struct {
     288             :         // LSM level (0 to NumLevels). When level=NumLevels, the cursor is at the end.
     289             :         level int
     290             :         // Inclusive lower bound for Smallest.UserKey for tables on level.
     291             :         key []byte
     292             :         // Inclusive lower bound for sequence number for tables on level with
     293             :         // Smallest.UserKey equaling key. Used to break ties within L0, and also used
     294             :         // to position a cursor immediately after a given file.
     295             :         seqNum uint64
     296             : }
     297             : 
     298             : var endCursor = downloadCursor{level: manifest.NumLevels}
     299             : 
     300             : // AtEnd returns true if the cursor is after all relevant files.
     301           1 : func (c downloadCursor) AtEnd() bool {
     302           1 :         return c.level >= manifest.NumLevels
     303           1 : }
     304             : 
     305           1 : func (c downloadCursor) String() string {
     306           1 :         return fmt.Sprintf("level=%d key=%q seqNum=%d", c.level, c.key, c.seqNum)
     307           1 : }
     308             : 
     309             : // makeCursorAtFile returns a downloadCursor that is immediately before the
     310             : // given file. Calling nextExternalFile on the resulting cursor (using the same
     311             : // version) should return f.
     312           1 : func makeCursorAtFile(f *fileMetadata, level int) downloadCursor {
     313           1 :         return downloadCursor{
     314           1 :                 level:  level,
     315           1 :                 key:    f.Smallest.UserKey,
     316           1 :                 seqNum: f.LargestSeqNum,
     317           1 :         }
     318           1 : }
     319             : 
     320             : // makeCursorAfterFile returns a downloadCursor that is immediately
     321             : // after the given file.
     322           1 : func makeCursorAfterFile(f *fileMetadata, level int) downloadCursor {
     323           1 :         return downloadCursor{
     324           1 :                 level:  level,
     325           1 :                 key:    f.Smallest.UserKey,
     326           1 :                 seqNum: f.LargestSeqNum + 1,
     327           1 :         }
     328           1 : }
     329             : 
     330           1 : func (c downloadCursor) FileIsAfterCursor(cmp base.Compare, f *fileMetadata, level int) bool {
     331           1 :         return c.Compare(cmp, makeCursorAfterFile(f, level)) < 0
     332           1 : }
     333             : 
     334           1 : func (c downloadCursor) Compare(keyCmp base.Compare, other downloadCursor) int {
     335           1 :         if c := cmp.Compare(c.level, other.level); c != 0 {
     336           0 :                 return c
     337           0 :         }
     338           1 :         if c := keyCmp(c.key, other.key); c != 0 {
     339           1 :                 return c
     340           1 :         }
     341           1 :         return cmp.Compare(c.seqNum, other.seqNum)
     342             : }
     343             : 
     344             : // NextExternalFile returns the first file after the cursor, returning the file
     345             : // and the level. If no such file exists, returns nil fileMetadata.
     346             : func (c downloadCursor) NextExternalFile(
     347             :         cmp base.Compare, objProvider objstorage.Provider, bounds base.UserKeyBounds, v *version,
     348           1 : ) (_ *fileMetadata, level int) {
     349           1 :         for !c.AtEnd() {
     350           1 :                 if f := c.NextExternalFileOnLevel(cmp, objProvider, bounds.End, v); f != nil {
     351           1 :                         return f, c.level
     352           1 :                 }
     353             :                 // Go to the next level.
     354           1 :                 c.key = bounds.Start
     355           1 :                 c.seqNum = 0
     356           1 :                 c.level++
     357             :         }
     358           1 :         return nil, manifest.NumLevels
     359             : }
     360             : 
     361             : // NextExternalFileOnLevel returns the first external file on c.level which is
     362             : // after c and with Smallest.UserKey within the end bound.
     363             : func (c downloadCursor) NextExternalFileOnLevel(
     364             :         cmp base.Compare, objProvider objstorage.Provider, endBound base.UserKeyBoundary, v *version,
     365           1 : ) *fileMetadata {
     366           1 :         if c.level > 0 {
     367           1 :                 it := v.Levels[c.level].Iter()
     368           1 :                 return firstExternalFileInLevelIter(cmp, objProvider, c, it, endBound)
     369           1 :         }
     370             :         // For L0, we look at all sublevel iterators and take the first file.
     371           1 :         var first *fileMetadata
     372           1 :         var firstCursor downloadCursor
     373           1 :         for _, sublevel := range v.L0SublevelFiles {
     374           1 :                 f := firstExternalFileInLevelIter(cmp, objProvider, c, sublevel.Iter(), endBound)
     375           1 :                 if f != nil {
     376           1 :                         c := makeCursorAtFile(f, c.level)
     377           1 :                         if first == nil || c.Compare(cmp, firstCursor) < 0 {
     378           1 :                                 first = f
     379           1 :                                 firstCursor = c
     380           1 :                         }
     381             :                         // Trim the end bound as an optimization.
     382           1 :                         endBound = base.UserKeyInclusive(f.Smallest.UserKey)
     383             :                 }
     384             :         }
     385           1 :         return first
     386             : }
     387             : 
     388             : // firstExternalFileInLevelIter finds the first external file after the cursor
     389             : // but which starts before the endBound. It is assumed that the iterator
     390             : // corresponds to cursor.level.
     391             : func firstExternalFileInLevelIter(
     392             :         cmp base.Compare,
     393             :         objProvider objstorage.Provider,
     394             :         cursor downloadCursor,
     395             :         it manifest.LevelIterator,
     396             :         endBound base.UserKeyBoundary,
     397           1 : ) *fileMetadata {
     398           1 :         f := it.SeekGE(cmp, cursor.key)
     399           1 :         // Skip the file if it starts before cursor.key or is at that same key with lower
     400           1 :         // sequence number.
     401           1 :         for f != nil && !cursor.FileIsAfterCursor(cmp, f, cursor.level) {
     402           1 :                 f = it.Next()
     403           1 :         }
     404           1 :         for ; f != nil && endBound.IsUpperBoundFor(cmp, f.Smallest.UserKey); f = it.Next() {
     405           1 :                 if f.Virtual && objstorage.IsExternalTable(objProvider, f.FileBacking.DiskFileNum) {
     406           1 :                         return f
     407           1 :                 }
     408             :         }
     409           1 :         return nil
     410             : }
     411             : 
     412             : // tryLaunchDownloadForFile attempt to launch a download compaction for the
     413             : // given file. Returns true on success, or false if the file is already
     414             : // involved in a compaction.
     415             : func (d *DB) tryLaunchDownloadForFile(
     416             :         vers *version, env compactionEnv, download *downloadSpanTask, level int, f *fileMetadata,
     417           1 : ) (doneCh chan error, ok bool) {
     418           1 :         if f.IsCompacting() {
     419           1 :                 return nil, false
     420           1 :         }
     421           1 :         if download.testing.launchDownloadCompaction != nil {
     422           1 :                 return download.testing.launchDownloadCompaction(f)
     423           1 :         }
     424           1 :         kind := compactionKindRewrite
     425           1 :         if download.downloadSpan.ViaBackingFileDownload {
     426           1 :                 kind = compactionKindCopy
     427           1 :         }
     428           1 :         pc := pickDownloadCompaction(vers, d.opts, env, d.mu.versions.picker.getBaseLevel(), kind, level, f)
     429           1 :         if pc == nil {
     430           0 :                 // We are not able to run this download compaction at this time.
     431           0 :                 return nil, false
     432           0 :         }
     433             : 
     434           1 :         download.numLaunchedDownloads++
     435           1 :         doneCh = make(chan error, 1)
     436           1 :         c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider)
     437           1 :         c.isDownload = true
     438           1 :         d.mu.compact.downloadingCount++
     439           1 :         d.addInProgressCompaction(c)
     440           1 :         go d.compact(c, doneCh)
     441           1 :         return doneCh, true
     442             : }
     443             : 
     444             : type launchDownloadResult int8
     445             : 
     446             : const (
     447             :         launchedCompaction launchDownloadResult = iota
     448             :         didNotLaunchCompaction
     449             :         downloadTaskCompleted
     450             : )
     451             : 
     452             : func (d *DB) tryLaunchDownloadCompaction(
     453             :         download *downloadSpanTask, vers *manifest.Version, env compactionEnv, maxConcurrentDownloads int,
     454           1 : ) launchDownloadResult {
     455           1 :         // First, check the bookmarks.
     456           1 :         for i := 0; i < len(download.bookmarks); i++ {
     457           1 :                 b := &download.bookmarks[i]
     458           1 :                 if b.downloadDoneCh != nil {
     459           1 :                         // First check if the compaction we launched completed.
     460           1 :                         select {
     461           1 :                         case compactionErr := <-b.downloadDoneCh:
     462           1 :                                 if compactionErr != nil && !errors.Is(compactionErr, ErrCancelledCompaction) {
     463           0 :                                         download.taskCompletedChan <- compactionErr
     464           0 :                                         return downloadTaskCompleted
     465           0 :                                 }
     466           1 :                                 b.downloadDoneCh = nil
     467             : 
     468             :                                 // Even if the compaction finished without an error, we still want to
     469             :                                 // check the rest of the bookmark range for external files.
     470             :                                 //
     471             :                                 // For example, say that we encounter a file ["a", "f"] and start a
     472             :                                 // download (creating a bookmark). Then that file gets excised into new
     473             :                                 // files ["a", "b"] and ["e", "f"] and the excise causes the download
     474             :                                 // compaction to be cancelled. We will start another download compaction
     475             :                                 // for ["a", "c"]; once that is complete, we still need to look at the
     476             :                                 // rest of the bookmark range (i.e. up to "f") to discover the
     477             :                                 // ["e", "f"] file.
     478             : 
     479           1 :                         default:
     480           1 :                                 // The compaction is still running, go to the next bookmark.
     481           1 :                                 continue
     482             :                         }
     483             :                 }
     484             : 
     485             :                 // If downloadDoneCh was nil, we are waiting on a compaction that we did not
     486             :                 // start. We are effectively polling the status by checking the external
     487             :                 // files within the bookmark. This is ok because this method is called (for
     488             :                 // this download task) at most once every time a compaction completes.
     489             : 
     490           1 :                 f := b.start.NextExternalFileOnLevel(d.cmp, d.objProvider, b.endBound, vers)
     491           1 :                 if f == nil {
     492           1 :                         // No more external files for this bookmark, remove it.
     493           1 :                         download.bookmarks = slices.Delete(download.bookmarks, i, i+1)
     494           1 :                         i--
     495           1 :                         continue
     496             :                 }
     497             : 
     498             :                 // Move up the bookmark position to point at this file.
     499           1 :                 b.start = makeCursorAtFile(f, b.start.level)
     500           1 :                 doneCh, ok := d.tryLaunchDownloadForFile(vers, env, download, b.start.level, f)
     501           1 :                 if ok {
     502           1 :                         b.downloadDoneCh = doneCh
     503           1 :                         return launchedCompaction
     504           1 :                 }
     505             :                 // We could not launch a download, which means the file is part of another
     506             :                 // compaction. We leave the bookmark in place and will poll the status in
     507             :                 // the code above.
     508             :         }
     509             : 
     510             :         // Try to advance the cursor and launch more downloads.
     511           1 :         for len(download.bookmarks) < maxConcurrentDownloads {
     512           1 :                 f, level := download.cursor.NextExternalFile(d.cmp, d.objProvider, download.bounds, vers)
     513           1 :                 if f == nil {
     514           1 :                         download.cursor = endCursor
     515           1 :                         if len(download.bookmarks) == 0 {
     516           1 :                                 download.taskCompletedChan <- nil
     517           1 :                                 return downloadTaskCompleted
     518           1 :                         }
     519           1 :                         return didNotLaunchCompaction
     520             :                 }
     521           1 :                 download.cursor = makeCursorAfterFile(f, level)
     522           1 : 
     523           1 :                 download.bookmarks = append(download.bookmarks, downloadBookmark{
     524           1 :                         start:    makeCursorAtFile(f, level),
     525           1 :                         endBound: base.UserKeyInclusive(f.Largest.UserKey),
     526           1 :                 })
     527           1 :                 doneCh, ok := d.tryLaunchDownloadForFile(vers, env, download, level, f)
     528           1 :                 if ok {
     529           1 :                         // We launched a download for this file.
     530           1 :                         download.bookmarks[len(download.bookmarks)-1].downloadDoneCh = doneCh
     531           1 :                         return launchedCompaction
     532           1 :                 }
     533             :         }
     534             : 
     535           1 :         return didNotLaunchCompaction
     536             : }

Generated by: LCOV version 1.14