LCOV - code coverage report
Current view: top level - pebble - version_set.go (source / functions) Coverage Total Hit
Test: 2025-06-10 08:19Z df305e42 - tests + meta.lcov Lines: 90.5 % 817 739
Test Date: 2025-06-10 08:21:23 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "fmt"
       9              :         "io"
      10              :         "sync"
      11              :         "sync/atomic"
      12              : 
      13              :         "github.com/cockroachdb/errors"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/invariants"
      16              :         "github.com/cockroachdb/pebble/internal/manifest"
      17              :         "github.com/cockroachdb/pebble/objstorage"
      18              :         "github.com/cockroachdb/pebble/record"
      19              :         "github.com/cockroachdb/pebble/sstable/blob"
      20              :         "github.com/cockroachdb/pebble/vfs"
      21              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      22              : )
      23              : 
      24              : const numLevels = manifest.NumLevels
      25              : 
      26              : const manifestMarkerName = `manifest`
      27              : 
      28              : // versionSet manages a collection of immutable versions, and manages the
      29              : // creation of a new version from the most recent version. A new version is
      30              : // created from an existing version by applying a version edit which is just
      31              : // like it sounds: a delta from the previous version. Version edits are logged
      32              : // to the MANIFEST file, which is replayed at startup.
      33              : type versionSet struct {
      34              :         // Next seqNum to use for WAL writes.
      35              :         logSeqNum base.AtomicSeqNum
      36              : 
      37              :         // The upper bound on sequence numbers that have been assigned so far. A
      38              :         // suffix of these sequence numbers may not have been written to a WAL. Both
      39              :         // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
      40              :         // visibleSeqNum is <= logSeqNum.
      41              :         visibleSeqNum base.AtomicSeqNum
      42              : 
      43              :         // Number of bytes present in sstables being written by in-progress
      44              :         // compactions. This value will be zero if there are no in-progress
      45              :         // compactions. Updated and read atomically.
      46              :         atomicInProgressBytes atomic.Int64
      47              : 
      48              :         // Immutable fields.
      49              :         dirname  string
      50              :         provider objstorage.Provider
      51              :         // Set to DB.mu.
      52              :         mu   *sync.Mutex
      53              :         opts *Options
      54              :         fs   vfs.FS
      55              :         cmp  *base.Comparer
      56              :         // Dynamic base level allows the dynamic base level computation to be
      57              :         // disabled. Used by tests which want to create specific LSM structures.
      58              :         dynamicBaseLevel bool
      59              : 
      60              :         // Mutable fields.
      61              :         versions    manifest.VersionList
      62              :         l0Organizer *manifest.L0Organizer
      63              :         // blobFiles is the set of blob files referenced by the current version.
      64              :         // blobFiles is protected by the manifest logLock (not vs.mu).
      65              :         blobFiles manifest.CurrentBlobFileSet
      66              :         picker    compactionPicker
      67              :         // curCompactionConcurrency is updated whenever picker is updated.
      68              :         // INVARIANT: >= 1.
      69              :         curCompactionConcurrency atomic.Int32
      70              : 
      71              :         // Not all metrics are kept here. See DB.Metrics().
      72              :         metrics Metrics
      73              : 
      74              :         // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
      75              :         // on the creation of every version.
      76              :         obsoleteFn func(manifest.ObsoleteFiles)
      77              :         // obsolete{Tables,Blobs,Manifests,Options} are sorted by file number ascending.
      78              :         obsoleteTables    []obsoleteFile
      79              :         obsoleteBlobs     []obsoleteFile
      80              :         obsoleteManifests []obsoleteFile
      81              :         obsoleteOptions   []obsoleteFile
      82              : 
      83              :         // Zombie tables which have been removed from the current version but are
      84              :         // still referenced by an inuse iterator.
      85              :         zombieTables zombieObjects
      86              :         // Zombie blobs which have been removed from the current version but are
      87              :         // still referenced by an inuse iterator.
      88              :         zombieBlobs zombieObjects
      89              :         // virtualBackings contains information about the FileBackings which support
      90              :         // virtual sstables in the latest version. It is mainly used to determine when
      91              :         // a backing is no longer in use by the tables in the latest version; this is
      92              :         // not a trivial problem because compactions and other operations can happen
      93              :         // in parallel (and they can finish in unpredictable order).
      94              :         //
      95              :         // This mechanism is complementary to the backing Ref/Unref mechanism, which
      96              :         // determines when a backing is no longer used by *any* live version and can
      97              :         // be removed.
      98              :         //
      99              :         // In principle this should have been a copy-on-write structure, with each
     100              :         // Version having its own record of its virtual backings (similar to the
     101              :         // B-tree that holds the tables). However, in practice we only need it for the
     102              :         // latest version, so we use a simpler structure and store it in the
     103              :         // versionSet instead.
     104              :         //
     105              :         // virtualBackings is modified under DB.mu and the log lock. If it is accessed
     106              :         // under DB.mu and a version update is in progress, it reflects the state of
     107              :         // the next version.
     108              :         virtualBackings manifest.VirtualBackings
     109              : 
     110              :         // minUnflushedLogNum is the smallest WAL log file number corresponding to
     111              :         // mutations that have not been flushed to an sstable.
     112              :         minUnflushedLogNum base.DiskFileNum
     113              : 
     114              :         // The next file number. A single counter is used to assign file
     115              :         // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
     116              :         nextFileNum atomic.Uint64
     117              : 
     118              :         // The current manifest file number.
     119              :         manifestFileNum base.DiskFileNum
     120              :         manifestMarker  *atomicfs.Marker
     121              : 
     122              :         manifestFile          vfs.File
     123              :         manifest              *record.Writer
     124              :         getFormatMajorVersion func() FormatMajorVersion
     125              : 
     126              :         writing    bool
     127              :         writerCond sync.Cond
     128              :         // State for deciding when to write a snapshot. Protected by mu.
     129              :         rotationHelper record.RotationHelper
     130              : 
     131              :         pickedCompactionCache pickedCompactionCache
     132              : }
     133              : 
     134              : func (vs *versionSet) init(
     135              :         dirname string,
     136              :         provider objstorage.Provider,
     137              :         opts *Options,
     138              :         marker *atomicfs.Marker,
     139              :         getFMV func() FormatMajorVersion,
     140              :         mu *sync.Mutex,
     141            2 : ) {
     142            2 :         vs.dirname = dirname
     143            2 :         vs.provider = provider
     144            2 :         vs.mu = mu
     145            2 :         vs.writerCond.L = mu
     146            2 :         vs.opts = opts
     147            2 :         vs.fs = opts.FS
     148            2 :         vs.cmp = opts.Comparer
     149            2 :         vs.dynamicBaseLevel = true
     150            2 :         vs.versions.Init(mu)
     151            2 :         vs.l0Organizer = manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes)
     152            2 :         vs.obsoleteFn = vs.addObsoleteLocked
     153            2 :         vs.zombieTables = makeZombieObjects()
     154            2 :         vs.zombieBlobs = makeZombieObjects()
     155            2 :         vs.virtualBackings = manifest.MakeVirtualBackings()
     156            2 :         vs.nextFileNum.Store(1)
     157            2 :         vs.manifestMarker = marker
     158            2 :         vs.getFormatMajorVersion = getFMV
     159            2 : }
     160              : 
     161              : // create creates a version set for a fresh DB.
     162              : func (vs *versionSet) create(
     163              :         jobID JobID,
     164              :         dirname string,
     165              :         provider objstorage.Provider,
     166              :         opts *Options,
     167              :         marker *atomicfs.Marker,
     168              :         getFormatMajorVersion func() FormatMajorVersion,
     169              :         mu *sync.Mutex,
     170            2 : ) error {
     171            2 :         vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
     172            2 :         emptyVersion := manifest.NewInitialVersion(opts.Comparer)
     173            2 :         vs.append(emptyVersion)
     174            2 :         vs.blobFiles.Init(nil)
     175            2 : 
     176            2 :         vs.setCompactionPicker(
     177            2 :                 newCompactionPickerByScore(emptyVersion, vs.l0Organizer, &vs.virtualBackings, vs.opts, nil))
     178            2 :         // Note that a "snapshot" version edit is written to the manifest when it is
     179            2 :         // created.
     180            2 :         vs.manifestFileNum = vs.getNextDiskFileNum()
     181            2 :         err := vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
     182            2 :         if err == nil {
     183            2 :                 if err = vs.manifest.Flush(); err != nil {
     184            1 :                         vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
     185            1 :                 }
     186              :         }
     187            2 :         if err == nil {
     188            2 :                 if err = vs.manifestFile.Sync(); err != nil {
     189            1 :                         vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
     190            1 :                 }
     191              :         }
     192            2 :         if err == nil {
     193            2 :                 // NB: Move() is responsible for syncing the data directory.
     194            2 :                 if err = vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, vs.manifestFileNum)); err != nil {
     195            1 :                         vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
     196            1 :                 }
     197              :         }
     198              : 
     199            2 :         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     200            2 :                 JobID:   int(jobID),
     201            2 :                 Path:    base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
     202            2 :                 FileNum: vs.manifestFileNum,
     203            2 :                 Err:     err,
     204            2 :         })
     205            2 :         if err != nil {
     206            1 :                 return err
     207            1 :         }
     208            2 :         return nil
     209              : }
     210              : 
     211              : // load loads the version set from the manifest file.
     212              : func (vs *versionSet) load(
     213              :         dirname string,
     214              :         provider objstorage.Provider,
     215              :         opts *Options,
     216              :         manifestFileNum base.DiskFileNum,
     217              :         marker *atomicfs.Marker,
     218              :         getFormatMajorVersion func() FormatMajorVersion,
     219              :         mu *sync.Mutex,
     220            2 : ) error {
     221            2 :         vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
     222            2 : 
     223            2 :         vs.manifestFileNum = manifestFileNum
     224            2 :         manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, vs.manifestFileNum)
     225            2 :         manifestFilename := opts.FS.PathBase(manifestPath)
     226            2 : 
     227            2 :         // Read the versionEdits in the manifest file.
     228            2 :         var bve manifest.BulkVersionEdit
     229            2 :         bve.AllAddedTables = make(map[base.TableNum]*manifest.TableMetadata)
     230            2 :         manifestFile, err := vs.fs.Open(manifestPath)
     231            2 :         if err != nil {
     232            0 :                 return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
     233            0 :                         errors.Safe(manifestFilename), dirname)
     234            0 :         }
     235            2 :         defer manifestFile.Close()
     236            2 :         rr := record.NewReader(manifestFile, 0 /* logNum */)
     237            2 :         for {
     238            2 :                 r, err := rr.Next()
     239            2 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     240            2 :                         break
     241              :                 }
     242            2 :                 if err != nil {
     243            0 :                         return errors.Wrapf(err, "pebble: error when loading manifest file %q",
     244            0 :                                 errors.Safe(manifestFilename))
     245            0 :                 }
     246            2 :                 var ve manifest.VersionEdit
     247            2 :                 err = ve.Decode(r)
     248            2 :                 if err != nil {
     249            1 :                         // Break instead of returning an error if the record is corrupted
     250            1 :                         // or invalid.
     251            1 :                         if err == io.EOF || record.IsInvalidRecord(err) {
     252            1 :                                 break
     253              :                         }
     254            0 :                         return err
     255              :                 }
     256            2 :                 if ve.ComparerName != "" {
     257            2 :                         if ve.ComparerName != vs.cmp.Name {
     258            1 :                                 return errors.Errorf("pebble: manifest file %q for DB %q: "+
     259            1 :                                         "comparer name from file %q != comparer name from Options %q",
     260            1 :                                         errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmp.Name))
     261            1 :                         }
     262              :                 }
     263            2 :                 if err := bve.Accumulate(&ve); err != nil {
     264            0 :                         return err
     265            0 :                 }
     266            2 :                 if ve.MinUnflushedLogNum != 0 {
     267            2 :                         vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     268            2 :                 }
     269            2 :                 if ve.NextFileNum != 0 {
     270            2 :                         vs.nextFileNum.Store(ve.NextFileNum)
     271            2 :                 }
     272            2 :                 if ve.LastSeqNum != 0 {
     273            2 :                         // logSeqNum is the _next_ sequence number that will be assigned,
     274            2 :                         // while LastSeqNum is the last assigned sequence number. Note that
     275            2 :                         // this behaviour mimics that in RocksDB; the first sequence number
     276            2 :                         // assigned is one greater than the one present in the manifest
     277            2 :                         // (assuming no WALs contain higher sequence numbers than the
     278            2 :                         // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
     279            2 :                         // next sequence number that will be assigned.
     280            2 :                         //
     281            2 :                         // If LastSeqNum is less than SeqNumStart, increase it to at least
     282            2 :                         // SeqNumStart to leave ample room for reserved sequence numbers.
     283            2 :                         if ve.LastSeqNum+1 < base.SeqNumStart {
     284            0 :                                 vs.logSeqNum.Store(base.SeqNumStart)
     285            2 :                         } else {
     286            2 :                                 vs.logSeqNum.Store(ve.LastSeqNum + 1)
     287            2 :                         }
     288              :                 }
     289              :         }
     290              :         // We have already set vs.nextFileNum = 2 at the beginning of the
     291              :         // function and could have only updated it to some other non-zero value,
     292              :         // so it cannot be 0 here.
     293            2 :         if vs.minUnflushedLogNum == 0 {
     294            2 :                 if vs.nextFileNum.Load() >= 2 {
     295            2 :                         // We either have a freshly created DB, or a DB created by RocksDB
     296            2 :                         // that has not had a single flushed SSTable yet. This is because
     297            2 :                         // RocksDB bumps up nextFileNum in this case without bumping up
     298            2 :                         // minUnflushedLogNum, even if WALs with non-zero file numbers are
     299            2 :                         // present in the directory.
     300            2 :                 } else {
     301            0 :                         return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
     302            0 :                                 errors.Safe(manifestFilename), dirname)
     303            0 :                 }
     304              :         }
     305            2 :         vs.markFileNumUsed(vs.minUnflushedLogNum)
     306            2 : 
     307            2 :         // Populate the virtual backings for virtual sstables since we have finished
     308            2 :         // version edit accumulation.
     309            2 :         for _, b := range bve.AddedFileBacking {
     310            2 :                 vs.virtualBackings.AddAndRef(b)
     311            2 :         }
     312              : 
     313            2 :         for _, addedLevel := range bve.AddedTables {
     314            2 :                 for _, m := range addedLevel {
     315            2 :                         if m.Virtual {
     316            2 :                                 vs.virtualBackings.AddTable(m)
     317            2 :                         }
     318              :                 }
     319              :         }
     320              : 
     321            2 :         if invariants.Enabled {
     322            2 :                 // There should be no deleted tables or backings, since we're starting from
     323            2 :                 // an empty state.
     324            2 :                 for _, deletedLevel := range bve.DeletedTables {
     325            2 :                         if len(deletedLevel) != 0 {
     326            0 :                                 panic("deleted files after manifest replay")
     327              :                         }
     328              :                 }
     329            2 :                 if len(bve.RemovedFileBacking) > 0 {
     330            0 :                         panic("deleted backings after manifest replay")
     331              :                 }
     332              :         }
     333              : 
     334            2 :         emptyVersion := manifest.NewInitialVersion(opts.Comparer)
     335            2 :         newVersion, err := bve.Apply(emptyVersion, opts.Experimental.ReadCompactionRate)
     336            2 :         if err != nil {
     337            0 :                 return err
     338            0 :         }
     339            2 :         vs.l0Organizer.PerformUpdate(vs.l0Organizer.PrepareUpdate(&bve, newVersion), newVersion)
     340            2 :         vs.l0Organizer.InitCompactingFileInfo(nil /* in-progress compactions */)
     341            2 :         vs.blobFiles.Init(&bve)
     342            2 :         vs.append(newVersion)
     343            2 : 
     344            2 :         for i := range vs.metrics.Levels {
     345            2 :                 l := &vs.metrics.Levels[i]
     346            2 :                 l.TablesCount = int64(newVersion.Levels[i].Len())
     347            2 :                 files := newVersion.Levels[i].Slice()
     348            2 :                 l.TablesSize = int64(files.TableSizeSum())
     349            2 :         }
     350            2 :         for _, l := range newVersion.Levels {
     351            2 :                 for f := range l.All() {
     352            2 :                         if !f.Virtual {
     353            2 :                                 isLocal, localSize := sizeIfLocal(f.TableBacking, vs.provider)
     354            2 :                                 vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
     355            2 :                                 if isLocal {
     356            2 :                                         vs.metrics.Table.Local.LiveCount++
     357            2 :                                 }
     358              :                         }
     359              :                 }
     360              :         }
     361            2 :         vs.virtualBackings.ForEach(func(backing *manifest.TableBacking) {
     362            2 :                 isLocal, localSize := sizeIfLocal(backing, vs.provider)
     363            2 :                 vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
     364            2 :                 if isLocal {
     365            2 :                         vs.metrics.Table.Local.LiveCount++
     366            2 :                 }
     367              :         })
     368              : 
     369            2 :         vs.setCompactionPicker(
     370            2 :                 newCompactionPickerByScore(newVersion, vs.l0Organizer, &vs.virtualBackings, vs.opts, nil))
     371            2 :         return nil
     372              : }
     373              : 
     374            2 : func (vs *versionSet) close() error {
     375            2 :         if vs.manifestFile != nil {
     376            2 :                 if err := vs.manifestFile.Close(); err != nil {
     377            0 :                         return err
     378            0 :                 }
     379              :         }
     380            2 :         if vs.manifestMarker != nil {
     381            2 :                 if err := vs.manifestMarker.Close(); err != nil {
     382            0 :                         return err
     383            0 :                 }
     384              :         }
     385            2 :         return nil
     386              : }
     387              : 
     388              : // logLock locks the manifest for writing. The lock must be released by
     389              : // a call to logUnlock.
     390              : //
     391              : // DB.mu must be held when calling this method, but the mutex may be dropped and
     392              : // re-acquired during the course of this method.
     393            2 : func (vs *versionSet) logLock() {
     394            2 :         // Wait for any existing writing to the manifest to complete, then mark the
     395            2 :         // manifest as busy.
     396            2 :         for vs.writing {
     397            2 :                 // Note: writerCond.L is DB.mu, so we unlock it while we wait.
     398            2 :                 vs.writerCond.Wait()
     399            2 :         }
     400            2 :         vs.writing = true
     401              : }
     402              : 
     403              : // logUnlock releases the lock for manifest writing.
     404              : //
     405              : // DB.mu must be held when calling this method.
     406            2 : func (vs *versionSet) logUnlock() {
     407            2 :         if !vs.writing {
     408            0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     409            0 :         }
     410            2 :         vs.writing = false
     411            2 :         vs.writerCond.Signal()
     412              : }
     413              : 
     414            2 : func (vs *versionSet) logUnlockAndInvalidatePickedCompactionCache() {
     415            2 :         vs.pickedCompactionCache.invalidate()
     416            2 :         vs.logUnlock()
     417            2 : }
     418              : 
     419              : // versionUpdate is returned by the function passed to UpdateVersionLocked.
     420              : //
     421              : // If VE is nil, there is no update to apply (but it is not an error).
     422              : type versionUpdate struct {
     423              :         VE      *manifest.VersionEdit
     424              :         JobID   JobID
     425              :         Metrics levelMetricsDelta
     426              :         // InProgressCompactionFn is called while DB.mu is held after the I/O part of
     427              :         // the update was performed. It should return any compactions that are
     428              :         // in-progress (excluding than the one that is being applied).
     429              :         InProgressCompactionsFn func() []compactionInfo
     430              :         ForceManifestRotation   bool
     431              : }
     432              : 
     433              : // UpdateVersionLocked is used to update the current version.
     434              : //
     435              : // DB.mu must be held. UpdateVersionLocked first waits for any other version
     436              : // update to complete, releasing and reacquiring DB.mu.
     437              : //
     438              : // UpdateVersionLocked then calls updateFn which builds a versionUpdate, while
     439              : // holding DB.mu. The updateFn can release and reacquire DB.mu (it should
     440              : // attempt to do as much work as possible outside of the lock).
     441              : //
     442              : // UpdateVersionLocked fills in the following fields of the VersionEdit:
     443              : // NextFileNum, LastSeqNum, RemovedBackingTables. The removed backing tables are
     444              : // those backings that are no longer used (in the new version) after applying
     445              : // the edit (as per vs.virtualBackings). Other than these fields, the
     446              : // VersionEdit must be complete.
     447              : //
     448              : // New table backing references (TableBacking.Ref) are taken as part of applying
     449              : // the version edit. The state of the virtual backings (vs.virtualBackings) is
     450              : // updated before logging to the manifest and installing the latest version;
     451              : // this is ok because any failure in those steps is fatal.
     452              : //
     453              : // If updateFn returns an error, no update is applied and that same error is returned.
     454              : // If versionUpdate.VE is nil, the no update is applied (and no error is returned).
     455            2 : func (vs *versionSet) UpdateVersionLocked(updateFn func() (versionUpdate, error)) error {
     456            2 :         vs.logLock()
     457            2 :         defer vs.logUnlockAndInvalidatePickedCompactionCache()
     458            2 : 
     459            2 :         vu, err := updateFn()
     460            2 :         if err != nil || vu.VE == nil {
     461            2 :                 return err
     462            2 :         }
     463              : 
     464            2 :         if !vs.writing {
     465            0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     466            0 :         }
     467              : 
     468            2 :         ve := vu.VE
     469            2 :         if ve.MinUnflushedLogNum != 0 {
     470            2 :                 if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
     471            2 :                         vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
     472            0 :                         panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
     473            0 :                                 ve.MinUnflushedLogNum))
     474              :                 }
     475              :         }
     476              : 
     477              :         // This is the next manifest filenum, but if the current file is too big we
     478              :         // will write this ve to the next file which means what ve encodes is the
     479              :         // current filenum and not the next one.
     480              :         //
     481              :         // TODO(sbhola): figure out why this is correct and update comment.
     482            2 :         ve.NextFileNum = vs.nextFileNum.Load()
     483            2 : 
     484            2 :         // LastSeqNum is set to the current upper bound on the assigned sequence
     485            2 :         // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
     486            2 :         // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
     487            2 :         // replay. It must be higher than or equal to any than any sequence number
     488            2 :         // written to an sstable, including sequence numbers in ingested files.
     489            2 :         // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
     490            2 :         // number. This is fallout from ingestion which allows a sequence number X to
     491            2 :         // be assigned to an ingested sstable even though sequence number X-1 resides
     492            2 :         // in an unflushed memtable. logSeqNum is the _next_ sequence number that
     493            2 :         // will be assigned, so subtract that by 1 to get the upper bound on the
     494            2 :         // last assigned sequence number.
     495            2 :         logSeqNum := vs.logSeqNum.Load()
     496            2 :         ve.LastSeqNum = logSeqNum - 1
     497            2 :         if logSeqNum == 0 {
     498            0 :                 // logSeqNum is initialized to 1 in Open() if there are no previous WAL
     499            0 :                 // or manifest records, so this case should never happen.
     500            0 :                 vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
     501            0 :         }
     502              : 
     503            2 :         currentVersion := vs.currentVersion()
     504            2 :         var newVersion *manifest.Version
     505            2 : 
     506            2 :         // Generate a new manifest if we don't currently have one, or forceRotation
     507            2 :         // is true, or the current one is too large.
     508            2 :         //
     509            2 :         // For largeness, we do not exclusively use MaxManifestFileSize size
     510            2 :         // threshold since we have had incidents where due to either large keys or
     511            2 :         // large numbers of files, each edit results in a snapshot + write of the
     512            2 :         // edit. This slows the system down since each flush or compaction is
     513            2 :         // writing a new manifest snapshot. The primary goal of the size-based
     514            2 :         // rollover logic is to ensure that when reopening a DB, the number of edits
     515            2 :         // that need to be replayed on top of the snapshot is "sane". Rolling over
     516            2 :         // to a new manifest after each edit is not relevant to that goal.
     517            2 :         //
     518            2 :         // Consider the following cases:
     519            2 :         // - The number of live files F in the DB is roughly stable: after writing
     520            2 :         //   the snapshot (with F files), say we require that there be enough edits
     521            2 :         //   such that the cumulative number of files in those edits, E, be greater
     522            2 :         //   than F. This will ensure that the total amount of time in
     523            2 :         //   UpdateVersionLocked that is spent in snapshot writing is ~50%.
     524            2 :         //
     525            2 :         // - The number of live files F in the DB is shrinking drastically, say from
     526            2 :         //   F to F/10: This can happen for various reasons, like wide range
     527            2 :         //   tombstones, or large numbers of smaller than usual files that are being
     528            2 :         //   merged together into larger files. And say the new files generated
     529            2 :         //   during this shrinkage is insignificant compared to F/10, and so for
     530            2 :         //   this example we will assume it is effectively 0. After this shrinking,
     531            2 :         //   E = 0.9F, and so if we used the previous snapshot file count, F, as the
     532            2 :         //   threshold that needs to be exceeded, we will further delay the snapshot
     533            2 :         //   writing. Which means on DB reopen we will need to replay 0.9F edits to
     534            2 :         //   get to a version with 0.1F files. It would be better to create a new
     535            2 :         //   snapshot when E exceeds the number of files in the current version.
     536            2 :         //
     537            2 :         // - The number of live files F in the DB is growing via perfect ingests
     538            2 :         //   into L6: Say we wrote the snapshot when there were F files and now we
     539            2 :         //   have 10F files, so E = 9F. We will further delay writing a new
     540            2 :         //   snapshot. This case can be critiqued as contrived, but we consider it
     541            2 :         //   nonetheless.
     542            2 :         //
     543            2 :         // The logic below uses the min of the last snapshot file count and the file
     544            2 :         // count in the current version.
     545            2 :         vs.rotationHelper.AddRecord(int64(len(ve.DeletedTables) + len(ve.NewTables)))
     546            2 :         sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
     547            2 :         requireRotation := vu.ForceManifestRotation || vs.manifest == nil
     548            2 : 
     549            2 :         var nextSnapshotFilecount int64
     550            2 :         for i := range vs.metrics.Levels {
     551            2 :                 nextSnapshotFilecount += vs.metrics.Levels[i].TablesCount
     552            2 :         }
     553            2 :         if sizeExceeded && !requireRotation {
     554            2 :                 requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
     555            2 :         }
     556            2 :         var newManifestFileNum base.DiskFileNum
     557            2 :         var prevManifestFileSize uint64
     558            2 :         var newManifestVirtualBackings []*manifest.TableBacking
     559            2 :         if requireRotation {
     560            2 :                 newManifestFileNum = vs.getNextDiskFileNum()
     561            2 :                 prevManifestFileSize = uint64(vs.manifest.Size())
     562            2 : 
     563            2 :                 // We want the virtual backings *before* applying the version edit, because
     564            2 :                 // the new manifest will contain the pre-apply version plus the last version
     565            2 :                 // edit.
     566            2 :                 newManifestVirtualBackings = vs.virtualBackings.Backings()
     567            2 :         }
     568              : 
     569              :         // Grab certain values before releasing vs.mu, in case createManifest() needs
     570              :         // to be called.
     571            2 :         minUnflushedLogNum := vs.minUnflushedLogNum
     572            2 :         nextFileNum := vs.nextFileNum.Load()
     573            2 : 
     574            2 :         // Note: this call populates ve.RemovedBackingTables.
     575            2 :         zombieBackings, removedVirtualBackings, localTablesLiveDelta :=
     576            2 :                 getZombieTablesAndUpdateVirtualBackings(ve, &vs.virtualBackings, vs.provider)
     577            2 : 
     578            2 :         var l0Update manifest.L0PreparedUpdate
     579            2 :         if err := func() error {
     580            2 :                 vs.mu.Unlock()
     581            2 :                 defer vs.mu.Lock()
     582            2 : 
     583            2 :                 if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
     584            0 :                         return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
     585            0 :                 }
     586              : 
     587              :                 // Rotate the manifest if necessary. Rotating the manifest involves
     588              :                 // creating a new file and writing an initial version edit containing a
     589              :                 // snapshot of the current version. This initial version edit will
     590              :                 // reflect the Version prior to the pending version edit (`ve`). Once
     591              :                 // we've created the new manifest with the previous version state, we'll
     592              :                 // append the version edit `ve` to the tail of the new manifest.
     593            2 :                 if newManifestFileNum != 0 {
     594            2 :                         if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
     595            1 :                                 vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     596            1 :                                         JobID:   int(vu.JobID),
     597            1 :                                         Path:    base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
     598            1 :                                         FileNum: newManifestFileNum,
     599            1 :                                         Err:     err,
     600            1 :                                 })
     601            1 :                                 return errors.Wrap(err, "MANIFEST create failed")
     602            1 :                         }
     603              :                 }
     604              : 
     605              :                 // Call ApplyAndUpdateVersionEdit before accumulating the version edit.
     606              :                 // If any blob files are no longer referenced, the version edit will be
     607              :                 // updated to explicitly record the deletion of the blob files. This can
     608              :                 // happen here because vs.blobFiles is protected by the manifest logLock
     609              :                 // (NOT vs.mu). We only read or write vs.blobFiles while holding the
     610              :                 // manifest lock.
     611            2 :                 if err := vs.blobFiles.ApplyAndUpdateVersionEdit(ve); err != nil {
     612            0 :                         return errors.Wrap(err, "MANIFEST blob files apply and update failed")
     613            0 :                 }
     614              : 
     615            2 :                 var bulkEdit manifest.BulkVersionEdit
     616            2 :                 err := bulkEdit.Accumulate(ve)
     617            2 :                 if err != nil {
     618            0 :                         return errors.Wrap(err, "MANIFEST accumulate failed")
     619            0 :                 }
     620            2 :                 newVersion, err = bulkEdit.Apply(currentVersion, vs.opts.Experimental.ReadCompactionRate)
     621            2 :                 if err != nil {
     622            0 :                         return errors.Wrap(err, "MANIFEST apply failed")
     623            0 :                 }
     624            2 :                 l0Update = vs.l0Organizer.PrepareUpdate(&bulkEdit, newVersion)
     625            2 : 
     626            2 :                 w, err := vs.manifest.Next()
     627            2 :                 if err != nil {
     628            0 :                         return errors.Wrap(err, "MANIFEST next record write failed")
     629            0 :                 }
     630              : 
     631              :                 // NB: Any error from this point on is considered fatal as we don't know if
     632              :                 // the MANIFEST write occurred or not. Trying to determine that is
     633              :                 // fraught. Instead we rely on the standard recovery mechanism run when a
     634              :                 // database is open. In particular, that mechanism generates a new MANIFEST
     635              :                 // and ensures it is synced.
     636            2 :                 if err := ve.Encode(w); err != nil {
     637            0 :                         return errors.Wrap(err, "MANIFEST write failed")
     638            0 :                 }
     639            2 :                 if err := vs.manifest.Flush(); err != nil {
     640            1 :                         return errors.Wrap(err, "MANIFEST flush failed")
     641            1 :                 }
     642            2 :                 if err := vs.manifestFile.Sync(); err != nil {
     643            1 :                         return errors.Wrap(err, "MANIFEST sync failed")
     644            1 :                 }
     645            2 :                 if newManifestFileNum != 0 {
     646            2 :                         // NB: Move() is responsible for syncing the data directory.
     647            2 :                         if err := vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, newManifestFileNum)); err != nil {
     648            0 :                                 return errors.Wrap(err, "MANIFEST set current failed")
     649            0 :                         }
     650            2 :                         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     651            2 :                                 JobID:   int(vu.JobID),
     652            2 :                                 Path:    base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
     653            2 :                                 FileNum: newManifestFileNum,
     654            2 :                         })
     655              :                 }
     656            2 :                 return nil
     657            1 :         }(); err != nil {
     658            1 :                 // Any error encountered during any of the operations in the previous
     659            1 :                 // closure are considered fatal. Treating such errors as fatal is preferred
     660            1 :                 // to attempting to unwind various file and b-tree reference counts, and
     661            1 :                 // re-generating L0 sublevel metadata. This may change in the future, if
     662            1 :                 // certain manifest / WAL operations become retryable. For more context, see
     663            1 :                 // #1159 and #1792.
     664            1 :                 vs.opts.Logger.Fatalf("%s", err)
     665            1 :                 return err
     666            1 :         }
     667              : 
     668            2 :         if requireRotation {
     669            2 :                 // Successfully rotated.
     670            2 :                 vs.rotationHelper.Rotate(nextSnapshotFilecount)
     671            2 :         }
     672              :         // Now that DB.mu is held again, initialize compacting file info in
     673              :         // L0Sublevels.
     674            2 :         inProgress := vu.InProgressCompactionsFn()
     675            2 : 
     676            2 :         zombieBlobs, localBlobLiveDelta := getZombieBlobFilesAndComputeLocalMetrics(ve, vs.provider)
     677            2 :         vs.l0Organizer.PerformUpdate(l0Update, newVersion)
     678            2 :         vs.l0Organizer.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
     679            2 : 
     680            2 :         // Update the zombie objects sets first, as installation of the new version
     681            2 :         // will unref the previous version which could result in addObsoleteLocked
     682            2 :         // being called.
     683            2 :         for _, b := range zombieBackings {
     684            2 :                 vs.zombieTables.Add(objectInfo{
     685            2 :                         fileInfo: fileInfo{
     686            2 :                                 FileNum:  b.backing.DiskFileNum,
     687            2 :                                 FileSize: b.backing.Size,
     688            2 :                         },
     689            2 :                         isLocal: b.isLocal,
     690            2 :                 })
     691            2 :         }
     692            2 :         for _, zb := range zombieBlobs {
     693            2 :                 vs.zombieBlobs.Add(zb)
     694            2 :         }
     695              :         // Unref the removed backings and report those that already became obsolete.
     696              :         // Note that the only case where we report obsolete tables here is when
     697              :         // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
     698              :         // it being used in the current version.
     699            2 :         var obsoleteVirtualBackings manifest.ObsoleteFiles
     700            2 :         for _, b := range removedVirtualBackings {
     701            2 :                 if b.backing.Unref() == 0 {
     702            2 :                         obsoleteVirtualBackings.TableBackings = append(obsoleteVirtualBackings.TableBackings, b.backing)
     703            2 :                 }
     704              :         }
     705            2 :         vs.addObsoleteLocked(obsoleteVirtualBackings)
     706            2 : 
     707            2 :         // Install the new version.
     708            2 :         vs.append(newVersion)
     709            2 : 
     710            2 :         if ve.MinUnflushedLogNum != 0 {
     711            2 :                 vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     712            2 :         }
     713            2 :         if newManifestFileNum != 0 {
     714            2 :                 if vs.manifestFileNum != 0 {
     715            2 :                         vs.obsoleteManifests = append(vs.obsoleteManifests, obsoleteFile{
     716            2 :                                 fileType: base.FileTypeManifest,
     717            2 :                                 fs:       vs.fs,
     718            2 :                                 path:     base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
     719            2 :                                 fileNum:  vs.manifestFileNum,
     720            2 :                                 fileSize: prevManifestFileSize,
     721            2 :                                 isLocal:  true,
     722            2 :                         })
     723            2 :                 }
     724            2 :                 vs.manifestFileNum = newManifestFileNum
     725              :         }
     726              : 
     727            2 :         vs.metrics.updateLevelMetrics(vu.Metrics)
     728            2 :         for i := range vs.metrics.Levels {
     729            2 :                 l := &vs.metrics.Levels[i]
     730            2 :                 l.TablesCount = int64(newVersion.Levels[i].Len())
     731            2 :                 l.VirtualTablesCount = newVersion.Levels[i].NumVirtual
     732            2 :                 l.VirtualTablesSize = newVersion.Levels[i].VirtualTableSize
     733            2 :                 l.TablesSize = int64(newVersion.Levels[i].TableSize())
     734            2 :                 l.EstimatedReferencesSize = newVersion.Levels[i].EstimatedReferenceSize()
     735            2 :                 l.Sublevels = 0
     736            2 :                 if l.TablesCount > 0 {
     737            2 :                         l.Sublevels = 1
     738            2 :                 }
     739            2 :                 if invariants.Enabled {
     740            2 :                         levelFiles := newVersion.Levels[i].Slice()
     741            2 :                         if size := int64(levelFiles.TableSizeSum()); l.TablesSize != size {
     742            0 :                                 vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.TablesSize, size)
     743            0 :                         }
     744            2 :                         refSize := uint64(0)
     745            2 :                         for f := range levelFiles.All() {
     746            2 :                                 refSize += f.EstimatedReferenceSize()
     747            2 :                         }
     748            2 :                         if refSize != l.EstimatedReferencesSize {
     749            0 :                                 vs.opts.Logger.Fatalf("versionSet metrics L%d EstimatedReferencesSize = %d, recomputed size = %d", i, l.EstimatedReferencesSize, refSize)
     750            0 :                         }
     751              : 
     752            2 :                         if nVirtual := levelFiles.NumVirtual(); nVirtual != l.VirtualTablesCount {
     753            0 :                                 vs.opts.Logger.Fatalf(
     754            0 :                                         "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
     755            0 :                                         i, l.VirtualTablesCount, nVirtual,
     756            0 :                                 )
     757            0 :                         }
     758            2 :                         if vSize := levelFiles.VirtualTableSizeSum(); vSize != l.VirtualTablesSize {
     759            0 :                                 vs.opts.Logger.Fatalf(
     760            0 :                                         "versionSet metrics L%d Virtual size = %d, actual size = %d",
     761            0 :                                         i, l.VirtualTablesSize, vSize,
     762            0 :                                 )
     763            0 :                         }
     764              :                 }
     765              :         }
     766            2 :         vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
     767            2 :         vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localTablesLiveDelta.size)
     768            2 :         vs.metrics.Table.Local.LiveCount = uint64(int64(vs.metrics.Table.Local.LiveCount) + localTablesLiveDelta.count)
     769            2 :         vs.metrics.BlobFiles.Local.LiveSize = uint64(int64(vs.metrics.BlobFiles.Local.LiveSize) + localBlobLiveDelta.size)
     770            2 :         vs.metrics.BlobFiles.Local.LiveCount = uint64(int64(vs.metrics.BlobFiles.Local.LiveCount) + localBlobLiveDelta.count)
     771            2 : 
     772            2 :         vs.setCompactionPicker(
     773            2 :                 newCompactionPickerByScore(newVersion, vs.l0Organizer, &vs.virtualBackings, vs.opts, inProgress))
     774            2 :         if !vs.dynamicBaseLevel {
     775            1 :                 vs.picker.forceBaseLevel1()
     776            1 :         }
     777            2 :         return nil
     778              : }
     779              : 
     780            2 : func (vs *versionSet) setCompactionPicker(picker *compactionPickerByScore) {
     781            2 :         vs.picker = picker
     782            2 :         vs.curCompactionConcurrency.Store(int32(picker.getCompactionConcurrency()))
     783            2 : }
     784              : 
     785              : type tableBackingInfo struct {
     786              :         backing *manifest.TableBacking
     787              :         isLocal bool
     788              : }
     789              : 
     790              : type fileMetricDelta struct {
     791              :         count int64
     792              :         size  int64
     793              : }
     794              : 
     795              : // getZombieTablesAndUpdateVirtualBackings updates the virtual backings with the
     796              : // changes in the versionEdit and populates ve.RemovedBackingTables.
     797              : // Returns:
     798              : //   - zombieBackings: all backings (physical and virtual) that will no longer be
     799              : //     needed when we apply ve.
     800              : //   - removedVirtualBackings: the virtual backings that will be removed by the
     801              : //     VersionEdit and which must be Unref()ed by the caller. These backings
     802              : //     match ve.RemovedBackingTables.
     803              : //   - localLiveSizeDelta: the delta in local live bytes.
     804              : func getZombieTablesAndUpdateVirtualBackings(
     805              :         ve *manifest.VersionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
     806            2 : ) (zombieBackings, removedVirtualBackings []tableBackingInfo, localLiveDelta fileMetricDelta) {
     807            2 :         // First, deal with the physical tables.
     808            2 :         //
     809            2 :         // A physical backing has become unused if it is in DeletedFiles but not in
     810            2 :         // NewFiles or CreatedBackingTables.
     811            2 :         //
     812            2 :         // Note that for the common case where there are very few elements, the map
     813            2 :         // will stay on the stack.
     814            2 :         stillUsed := make(map[base.DiskFileNum]struct{})
     815            2 :         for _, nf := range ve.NewTables {
     816            2 :                 if !nf.Meta.Virtual {
     817            2 :                         stillUsed[nf.Meta.TableBacking.DiskFileNum] = struct{}{}
     818            2 :                         isLocal, localFileDelta := sizeIfLocal(nf.Meta.TableBacking, provider)
     819            2 :                         localLiveDelta.size += localFileDelta
     820            2 :                         if isLocal {
     821            2 :                                 localLiveDelta.count++
     822            2 :                         }
     823              :                 }
     824              :         }
     825            2 :         for _, b := range ve.CreatedBackingTables {
     826            2 :                 stillUsed[b.DiskFileNum] = struct{}{}
     827            2 :         }
     828            2 :         for _, m := range ve.DeletedTables {
     829            2 :                 if !m.Virtual {
     830            2 :                         // NB: this deleted file may also be in NewFiles or
     831            2 :                         // CreatedBackingTables, due to a file moving between levels, or
     832            2 :                         // becoming virtualized. In which case there is no change due to this
     833            2 :                         // file in the localLiveSizeDelta -- the subtraction below compensates
     834            2 :                         // for the addition.
     835            2 :                         isLocal, localFileDelta := sizeIfLocal(m.TableBacking, provider)
     836            2 :                         localLiveDelta.size -= localFileDelta
     837            2 :                         if isLocal {
     838            2 :                                 localLiveDelta.count--
     839            2 :                         }
     840            2 :                         if _, ok := stillUsed[m.TableBacking.DiskFileNum]; !ok {
     841            2 :                                 zombieBackings = append(zombieBackings, tableBackingInfo{
     842            2 :                                         backing: m.TableBacking,
     843            2 :                                         isLocal: isLocal,
     844            2 :                                 })
     845            2 :                         }
     846              :                 }
     847              :         }
     848              : 
     849              :         // Now deal with virtual tables.
     850              :         //
     851              :         // When a virtual table moves between levels we AddTable() then RemoveTable(),
     852              :         // which works out.
     853            2 :         for _, b := range ve.CreatedBackingTables {
     854            2 :                 virtualBackings.AddAndRef(b)
     855            2 :                 isLocal, localFileDelta := sizeIfLocal(b, provider)
     856            2 :                 localLiveDelta.size += localFileDelta
     857            2 :                 if isLocal {
     858            2 :                         localLiveDelta.count++
     859            2 :                 }
     860              :         }
     861            2 :         for _, nf := range ve.NewTables {
     862            2 :                 if nf.Meta.Virtual {
     863            2 :                         virtualBackings.AddTable(nf.Meta)
     864            2 :                 }
     865              :         }
     866            2 :         for _, m := range ve.DeletedTables {
     867            2 :                 if m.Virtual {
     868            2 :                         virtualBackings.RemoveTable(m)
     869            2 :                 }
     870              :         }
     871              : 
     872            2 :         if unused := virtualBackings.Unused(); len(unused) > 0 {
     873            2 :                 // Virtual backings that are no longer used are zombies and are also added
     874            2 :                 // to RemovedBackingTables (before the version edit is written to disk).
     875            2 :                 ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
     876            2 :                 for i, b := range unused {
     877            2 :                         isLocal, localFileDelta := sizeIfLocal(b, provider)
     878            2 :                         localLiveDelta.size -= localFileDelta
     879            2 :                         if isLocal {
     880            2 :                                 localLiveDelta.count--
     881            2 :                         }
     882            2 :                         ve.RemovedBackingTables[i] = b.DiskFileNum
     883            2 :                         zombieBackings = append(zombieBackings, tableBackingInfo{
     884            2 :                                 backing: b,
     885            2 :                                 isLocal: isLocal,
     886            2 :                         })
     887            2 :                         virtualBackings.Remove(b.DiskFileNum)
     888              :                 }
     889            2 :                 removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
     890              :         }
     891            2 :         return zombieBackings, removedVirtualBackings, localLiveDelta
     892              : }
     893              : 
     894              : // getZombieBlobFilesAndComputeLocalMetrics constructs objectInfos for all
     895              : // zombie blob files, and computes the metric deltas for live files overall and
     896              : // locally.
     897              : func getZombieBlobFilesAndComputeLocalMetrics(
     898              :         ve *manifest.VersionEdit, provider objstorage.Provider,
     899            2 : ) (zombieBlobFiles []objectInfo, localLiveDelta fileMetricDelta) {
     900            2 :         for _, b := range ve.NewBlobFiles {
     901            2 :                 if objstorage.IsLocalBlobFile(provider, b.Physical.FileNum) {
     902            2 :                         localLiveDelta.count++
     903            2 :                         localLiveDelta.size += int64(b.Physical.Size)
     904            2 :                 }
     905              :         }
     906            2 :         zombieBlobFiles = make([]objectInfo, 0, len(ve.DeletedBlobFiles))
     907            2 :         for _, physical := range ve.DeletedBlobFiles {
     908            2 :                 isLocal := objstorage.IsLocalBlobFile(provider, physical.FileNum)
     909            2 :                 if isLocal {
     910            2 :                         localLiveDelta.count--
     911            2 :                         localLiveDelta.size -= int64(physical.Size)
     912            2 :                 }
     913            2 :                 zombieBlobFiles = append(zombieBlobFiles, objectInfo{
     914            2 :                         fileInfo: fileInfo{
     915            2 :                                 FileNum:  physical.FileNum,
     916            2 :                                 FileSize: physical.Size,
     917            2 :                         },
     918            2 :                         isLocal: isLocal,
     919            2 :                 })
     920              :         }
     921            2 :         return zombieBlobFiles, localLiveDelta
     922              : }
     923              : 
     924              : // sizeIfLocal returns backing.Size if the backing is a local file, else 0.
     925              : func sizeIfLocal(
     926              :         backing *manifest.TableBacking, provider objstorage.Provider,
     927            2 : ) (isLocal bool, localSize int64) {
     928            2 :         isLocal = objstorage.IsLocalTable(provider, backing.DiskFileNum)
     929            2 :         if isLocal {
     930            2 :                 return true, int64(backing.Size)
     931            2 :         }
     932            2 :         return false, 0
     933              : }
     934              : 
     935              : func (vs *versionSet) incrementCompactions(
     936              :         kind compactionKind,
     937              :         extraLevels []*compactionLevel,
     938              :         pickerMetrics pickedCompactionMetrics,
     939              :         bytesWritten int64,
     940              :         compactionErr error,
     941            2 : ) {
     942            2 :         if kind == compactionKindFlush || kind == compactionKindIngestedFlushable {
     943            2 :                 vs.metrics.Flush.Count++
     944            2 :         } else {
     945            2 :                 vs.metrics.Compact.Count++
     946            2 :                 if compactionErr != nil {
     947            2 :                         if errors.Is(compactionErr, ErrCancelledCompaction) {
     948            2 :                                 vs.metrics.Compact.CancelledCount++
     949            2 :                                 vs.metrics.Compact.CancelledBytes += bytesWritten
     950            2 :                         } else {
     951            1 :                                 vs.metrics.Compact.FailedCount++
     952            1 :                         }
     953              :                 }
     954              :         }
     955              : 
     956            2 :         switch kind {
     957            2 :         case compactionKindDefault:
     958            2 :                 vs.metrics.Compact.DefaultCount++
     959              : 
     960            2 :         case compactionKindFlush, compactionKindIngestedFlushable:
     961              : 
     962            2 :         case compactionKindMove:
     963            2 :                 vs.metrics.Compact.MoveCount++
     964              : 
     965            2 :         case compactionKindDeleteOnly:
     966            2 :                 vs.metrics.Compact.DeleteOnlyCount++
     967              : 
     968            2 :         case compactionKindElisionOnly:
     969            2 :                 vs.metrics.Compact.ElisionOnlyCount++
     970              : 
     971            1 :         case compactionKindRead:
     972            1 :                 vs.metrics.Compact.ReadCount++
     973              : 
     974            2 :         case compactionKindTombstoneDensity:
     975            2 :                 vs.metrics.Compact.TombstoneDensityCount++
     976              : 
     977            2 :         case compactionKindRewrite:
     978            2 :                 vs.metrics.Compact.RewriteCount++
     979              : 
     980            2 :         case compactionKindCopy:
     981            2 :                 vs.metrics.Compact.CopyCount++
     982              : 
     983            0 :         default:
     984            0 :                 if invariants.Enabled {
     985            0 :                         panic("unhandled compaction kind")
     986              :                 }
     987              :         }
     988            2 :         if len(extraLevels) > 0 {
     989            2 :                 vs.metrics.Compact.MultiLevelCount++
     990            2 :         }
     991              : }
     992              : 
     993            2 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
     994            2 :         vs.atomicInProgressBytes.Add(numBytes)
     995            2 : }
     996              : 
     997              : // createManifest creates a manifest file that contains a snapshot of vs.
     998              : func (vs *versionSet) createManifest(
     999              :         dirname string,
    1000              :         fileNum, minUnflushedLogNum base.DiskFileNum,
    1001              :         nextFileNum uint64,
    1002              :         virtualBackings []*manifest.TableBacking,
    1003            2 : ) (err error) {
    1004            2 :         var (
    1005            2 :                 filename       = base.MakeFilepath(vs.fs, dirname, base.FileTypeManifest, fileNum)
    1006            2 :                 manifestFile   vfs.File
    1007            2 :                 manifestWriter *record.Writer
    1008            2 :         )
    1009            2 :         defer func() {
    1010            2 :                 if manifestWriter != nil {
    1011            0 :                         _ = manifestWriter.Close()
    1012            0 :                 }
    1013            2 :                 if manifestFile != nil {
    1014            0 :                         _ = manifestFile.Close()
    1015            0 :                 }
    1016            2 :                 if err != nil {
    1017            1 :                         _ = vs.fs.Remove(filename)
    1018            1 :                 }
    1019              :         }()
    1020            2 :         manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
    1021            2 :         if err != nil {
    1022            1 :                 return err
    1023            1 :         }
    1024            2 :         manifestWriter = record.NewWriter(manifestFile)
    1025            2 : 
    1026            2 :         snapshot := manifest.VersionEdit{
    1027            2 :                 ComparerName: vs.cmp.Name,
    1028            2 :                 // When creating a version snapshot for an existing DB, this snapshot
    1029            2 :                 // VersionEdit will be immediately followed by another VersionEdit
    1030            2 :                 // (being written in UpdateVersionLocked()). That VersionEdit always
    1031            2 :                 // contains a LastSeqNum, so we don't need to include that in the
    1032            2 :                 // snapshot.  But it does not necessarily include MinUnflushedLogNum,
    1033            2 :                 // NextFileNum, so we initialize those using the corresponding fields in
    1034            2 :                 // the versionSet (which came from the latest preceding VersionEdit that
    1035            2 :                 // had those fields).
    1036            2 :                 MinUnflushedLogNum:   minUnflushedLogNum,
    1037            2 :                 NextFileNum:          nextFileNum,
    1038            2 :                 CreatedBackingTables: virtualBackings,
    1039            2 :                 NewBlobFiles:         vs.blobFiles.Metadatas(),
    1040            2 :         }
    1041            2 : 
    1042            2 :         // Add all extant sstables in the current version.
    1043            2 :         for level, levelMetadata := range vs.currentVersion().Levels {
    1044            2 :                 for meta := range levelMetadata.All() {
    1045            2 :                         snapshot.NewTables = append(snapshot.NewTables, manifest.NewTableEntry{
    1046            2 :                                 Level: level,
    1047            2 :                                 Meta:  meta,
    1048            2 :                         })
    1049            2 :                 }
    1050              :         }
    1051              : 
    1052            2 :         w, err1 := manifestWriter.Next()
    1053            2 :         if err1 != nil {
    1054            0 :                 return err1
    1055            0 :         }
    1056            2 :         if err := snapshot.Encode(w); err != nil {
    1057            0 :                 return err
    1058            0 :         }
    1059              : 
    1060            2 :         if vs.manifest != nil {
    1061            2 :                 if err := vs.manifest.Close(); err != nil {
    1062            0 :                         return err
    1063            0 :                 }
    1064            2 :                 vs.manifest = nil
    1065              :         }
    1066            2 :         if vs.manifestFile != nil {
    1067            2 :                 if err := vs.manifestFile.Close(); err != nil {
    1068            0 :                         return err
    1069            0 :                 }
    1070            2 :                 vs.manifestFile = nil
    1071              :         }
    1072              : 
    1073            2 :         vs.manifest, manifestWriter = manifestWriter, nil
    1074            2 :         vs.manifestFile, manifestFile = manifestFile, nil
    1075            2 :         return nil
    1076              : }
    1077              : 
    1078              : // NB: This method is not safe for concurrent use. It is only safe
    1079              : // to be called when concurrent changes to nextFileNum are not expected.
    1080            2 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
    1081            2 :         if vs.nextFileNum.Load() <= uint64(fileNum) {
    1082            2 :                 vs.nextFileNum.Store(uint64(fileNum + 1))
    1083            2 :         }
    1084              : }
    1085              : 
    1086              : // getNextTableNum returns a new table number.
    1087              : //
    1088              : // Can be called without the versionSet's mutex being held.
    1089            2 : func (vs *versionSet) getNextTableNum() base.TableNum {
    1090            2 :         x := vs.nextFileNum.Add(1) - 1
    1091            2 :         return base.TableNum(x)
    1092            2 : }
    1093              : 
    1094              : // Can be called without the versionSet's mutex being held.
    1095            2 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
    1096            2 :         x := vs.nextFileNum.Add(1) - 1
    1097            2 :         return base.DiskFileNum(x)
    1098            2 : }
    1099              : 
    1100            2 : func (vs *versionSet) append(v *manifest.Version) {
    1101            2 :         if v.Refs() != 0 {
    1102            0 :                 panic("pebble: version should be unreferenced")
    1103              :         }
    1104            2 :         if !vs.versions.Empty() {
    1105            2 :                 vs.versions.Back().UnrefLocked()
    1106            2 :         }
    1107            2 :         v.Deleted = vs.obsoleteFn
    1108            2 :         v.Ref()
    1109            2 :         vs.versions.PushBack(v)
    1110            2 :         if invariants.Enabled {
    1111            2 :                 // Verify that the virtualBackings contains all the backings referenced by
    1112            2 :                 // the version.
    1113            2 :                 for _, l := range v.Levels {
    1114            2 :                         for f := range l.All() {
    1115            2 :                                 if f.Virtual {
    1116            2 :                                         if _, ok := vs.virtualBackings.Get(f.TableBacking.DiskFileNum); !ok {
    1117            0 :                                                 panic(fmt.Sprintf("%s is not in virtualBackings", f.TableBacking.DiskFileNum))
    1118              :                                         }
    1119              :                                 }
    1120              :                         }
    1121              :                 }
    1122              :         }
    1123              : }
    1124              : 
    1125            2 : func (vs *versionSet) currentVersion() *manifest.Version {
    1126            2 :         return vs.versions.Back()
    1127            2 : }
    1128              : 
    1129            2 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
    1130            2 :         current := vs.currentVersion()
    1131            2 :         for v := vs.versions.Front(); true; v = v.Next() {
    1132            2 :                 for _, lm := range v.Levels {
    1133            2 :                         for f := range lm.All() {
    1134            2 :                                 m[f.TableBacking.DiskFileNum] = struct{}{}
    1135            2 :                                 for _, ref := range f.BlobReferences {
    1136            2 :                                         // TODO(jackson): Once we support blob file replacement, we
    1137            2 :                                         // need to look up the new blob file's number here.
    1138            2 :                                         diskFileNum := blob.DiskFileNumTODO(ref.FileID)
    1139            2 :                                         m[diskFileNum] = struct{}{}
    1140            2 :                                 }
    1141              :                         }
    1142              :                 }
    1143            2 :                 if v == current {
    1144            2 :                         break
    1145              :                 }
    1146              :         }
    1147              :         // virtualBackings contains backings that are referenced by some virtual
    1148              :         // tables in the latest version (which are handled above), and backings that
    1149              :         // are not but are still alive because of the protection mechanism (see
    1150              :         // manifset.VirtualBackings). This loop ensures the latter get added to the
    1151              :         // map.
    1152            2 :         vs.virtualBackings.ForEach(func(b *manifest.TableBacking) {
    1153            2 :                 m[b.DiskFileNum] = struct{}{}
    1154            2 :         })
    1155              : }
    1156              : 
    1157              : // addObsoleteLocked will add the fileInfo associated with obsolete backing
    1158              : // sstables to the obsolete tables list.
    1159              : //
    1160              : // The file backings in the obsolete list must not appear more than once.
    1161              : //
    1162              : // DB.mu must be held when addObsoleteLocked is called.
    1163            2 : func (vs *versionSet) addObsoleteLocked(obsolete manifest.ObsoleteFiles) {
    1164            2 :         if obsolete.Count() == 0 {
    1165            2 :                 return
    1166            2 :         }
    1167              : 
    1168              :         // Note that the zombie objects transition from zombie *to* obsolete, and
    1169              :         // will no longer be considered zombie.
    1170              : 
    1171            2 :         newlyObsoleteTables := make([]obsoleteFile, len(obsolete.TableBackings))
    1172            2 :         for i, bs := range obsolete.TableBackings {
    1173            2 :                 newlyObsoleteTables[i] = vs.zombieTables.Extract(bs.DiskFileNum).
    1174            2 :                         asObsoleteFile(vs.fs, base.FileTypeTable, vs.dirname)
    1175            2 :         }
    1176            2 :         vs.obsoleteTables = mergeObsoleteFiles(vs.obsoleteTables, newlyObsoleteTables)
    1177            2 : 
    1178            2 :         newlyObsoleteBlobFiles := make([]obsoleteFile, len(obsolete.BlobFiles))
    1179            2 :         for i, bf := range obsolete.BlobFiles {
    1180            2 :                 newlyObsoleteBlobFiles[i] = vs.zombieBlobs.Extract(bf.FileNum).
    1181            2 :                         asObsoleteFile(vs.fs, base.FileTypeBlob, vs.dirname)
    1182            2 :         }
    1183            2 :         vs.obsoleteBlobs = mergeObsoleteFiles(vs.obsoleteBlobs, newlyObsoleteBlobFiles)
    1184            2 :         vs.updateObsoleteObjectMetricsLocked()
    1185              : }
    1186              : 
    1187              : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
    1188              : // called.
    1189            2 : func (vs *versionSet) addObsolete(obsolete manifest.ObsoleteFiles) {
    1190            2 :         vs.mu.Lock()
    1191            2 :         defer vs.mu.Unlock()
    1192            2 :         vs.addObsoleteLocked(obsolete)
    1193            2 : }
    1194              : 
    1195            2 : func (vs *versionSet) updateObsoleteObjectMetricsLocked() {
    1196            2 :         // TODO(jackson): Ideally we would update vs.fileDeletions.queuedStats to
    1197            2 :         // include the files on vs.obsolete{Tables,Blobs}, but there's subtlety in
    1198            2 :         // deduplicating the files before computing the stats. It might also be
    1199            2 :         // possible to refactor to remove the vs.obsolete{Tables,Blobs} intermediary
    1200            2 :         // step. Revisit this.
    1201            2 :         vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
    1202            2 :         vs.metrics.Table.ObsoleteSize = 0
    1203            2 :         vs.metrics.Table.Local.ObsoleteSize = 0
    1204            2 :         vs.metrics.Table.Local.ObsoleteCount = 0
    1205            2 :         for _, fi := range vs.obsoleteTables {
    1206            2 :                 vs.metrics.Table.ObsoleteSize += fi.fileSize
    1207            2 :                 if fi.isLocal {
    1208            2 :                         vs.metrics.Table.Local.ObsoleteSize += fi.fileSize
    1209            2 :                         vs.metrics.Table.Local.ObsoleteCount++
    1210            2 :                 }
    1211              :         }
    1212            2 :         vs.metrics.BlobFiles.ObsoleteCount = uint64(len(vs.obsoleteBlobs))
    1213            2 :         vs.metrics.BlobFiles.ObsoleteSize = 0
    1214            2 :         vs.metrics.BlobFiles.Local.ObsoleteSize = 0
    1215            2 :         vs.metrics.BlobFiles.Local.ObsoleteCount = 0
    1216            2 :         for _, fi := range vs.obsoleteBlobs {
    1217            2 :                 vs.metrics.BlobFiles.ObsoleteSize += fi.fileSize
    1218            2 :                 if fi.isLocal {
    1219            2 :                         vs.metrics.BlobFiles.Local.ObsoleteSize += fi.fileSize
    1220            2 :                         vs.metrics.BlobFiles.Local.ObsoleteCount++
    1221            2 :                 }
    1222              :         }
    1223              : }
    1224              : 
    1225              : func findCurrentManifest(
    1226              :         fs vfs.FS, dirname string, ls []string,
    1227            2 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
    1228            2 :         // Locating a marker should succeed even if the marker has never been placed.
    1229            2 :         var filename string
    1230            2 :         marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
    1231            2 :         if err != nil {
    1232            1 :                 return nil, 0, false, err
    1233            1 :         }
    1234              : 
    1235            2 :         if filename == "" {
    1236            2 :                 // The marker hasn't been set yet. This database doesn't exist.
    1237            2 :                 return marker, 0, false, nil
    1238            2 :         }
    1239              : 
    1240            2 :         var ok bool
    1241            2 :         _, manifestNum, ok = base.ParseFilename(fs, filename)
    1242            2 :         if !ok {
    1243            0 :                 return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
    1244            0 :         }
    1245            2 :         return marker, manifestNum, true, nil
    1246              : }
    1247              : 
    1248            1 : func newFileMetrics(newFiles []manifest.NewTableEntry) levelMetricsDelta {
    1249            1 :         var m levelMetricsDelta
    1250            1 :         for _, nf := range newFiles {
    1251            1 :                 lm := m[nf.Level]
    1252            1 :                 if lm == nil {
    1253            1 :                         lm = &LevelMetrics{}
    1254            1 :                         m[nf.Level] = lm
    1255            1 :                 }
    1256            1 :                 lm.TablesCount++
    1257            1 :                 lm.TablesSize += int64(nf.Meta.Size)
    1258            1 :                 lm.EstimatedReferencesSize += nf.Meta.EstimatedReferenceSize()
    1259              :         }
    1260            1 :         return m
    1261              : }
        

Generated by: LCOV version 2.0-1