LCOV - code coverage report
Current view: top level - pebble - version_set.go (source / functions) Hit Total Coverage
Test: 2024-05-02 08:15Z 902b6d09 - meta test only.lcov Lines: 594 722 82.3 %
Date: 2024-05-02 08:16:32 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "sync"
      11             :         "sync/atomic"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/internal/manifest"
      17             :         "github.com/cockroachdb/pebble/objstorage"
      18             :         "github.com/cockroachdb/pebble/record"
      19             :         "github.com/cockroachdb/pebble/vfs"
      20             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      21             : )
      22             : 
      23             : const numLevels = manifest.NumLevels
      24             : 
      25             : const manifestMarkerName = `manifest`
      26             : 
      27             : // Provide type aliases for the various manifest structs.
      28             : type bulkVersionEdit = manifest.BulkVersionEdit
      29             : type deletedFileEntry = manifest.DeletedFileEntry
      30             : type fileMetadata = manifest.FileMetadata
      31             : type physicalMeta = manifest.PhysicalFileMeta
      32             : type virtualMeta = manifest.VirtualFileMeta
      33             : type fileBacking = manifest.FileBacking
      34             : type newFileEntry = manifest.NewFileEntry
      35             : type version = manifest.Version
      36             : type versionEdit = manifest.VersionEdit
      37             : type versionList = manifest.VersionList
      38             : 
      39             : // versionSet manages a collection of immutable versions, and manages the
      40             : // creation of a new version from the most recent version. A new version is
      41             : // created from an existing version by applying a version edit which is just
      42             : // like it sounds: a delta from the previous version. Version edits are logged
      43             : // to the MANIFEST file, which is replayed at startup.
      44             : type versionSet struct {
      45             :         // Next seqNum to use for WAL writes.
      46             :         logSeqNum atomic.Uint64
      47             : 
      48             :         // The upper bound on sequence numbers that have been assigned so far. A
      49             :         // suffix of these sequence numbers may not have been written to a WAL. Both
      50             :         // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
      51             :         // visibleSeqNum is <= logSeqNum.
      52             :         visibleSeqNum atomic.Uint64
      53             : 
      54             :         // Number of bytes present in sstables being written by in-progress
      55             :         // compactions. This value will be zero if there are no in-progress
      56             :         // compactions. Updated and read atomically.
      57             :         atomicInProgressBytes atomic.Int64
      58             : 
      59             :         // Immutable fields.
      60             :         dirname  string
      61             :         provider objstorage.Provider
      62             :         // Set to DB.mu.
      63             :         mu   *sync.Mutex
      64             :         opts *Options
      65             :         fs   vfs.FS
      66             :         cmp  *base.Comparer
      67             :         // Dynamic base level allows the dynamic base level computation to be
      68             :         // disabled. Used by tests which want to create specific LSM structures.
      69             :         dynamicBaseLevel bool
      70             : 
      71             :         // Mutable fields.
      72             :         versions versionList
      73             :         picker   compactionPicker
      74             : 
      75             :         // Not all metrics are kept here. See DB.Metrics().
      76             :         metrics Metrics
      77             : 
      78             :         // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
      79             :         // on the creation of every version.
      80             :         obsoleteFn        func(obsolete []*fileBacking)
      81             :         obsoleteTables    []tableInfo
      82             :         obsoleteManifests []fileInfo
      83             :         obsoleteOptions   []fileInfo
      84             : 
      85             :         // Zombie tables which have been removed from the current version but are
      86             :         // still referenced by an inuse iterator.
      87             :         zombieTables map[base.DiskFileNum]tableInfo
      88             : 
      89             :         // virtualBackings contains information about the FileBackings which support
      90             :         // virtual sstables in the latest version. It is mainly used to determine when
      91             :         // a backing is no longer in use by the tables in the latest version; this is
      92             :         // not a trivial problem because compactions and other operations can happen
      93             :         // in parallel (and they can finish in unpredictable order).
      94             :         //
      95             :         // This mechanism is complementary to the backing Ref/Unref mechanism, which
      96             :         // determines when a backing is no longer used by *any* live version and can
      97             :         // be removed.
      98             :         //
      99             :         // In principle this should have been a copy-on-write structure, with each
     100             :         // Version having its own record of its virtual backings (similar to the
     101             :         // B-tree that holds the tables). However, in practice we only need it for the
     102             :         // latest version, so we use a simpler structure and store it in the
     103             :         // versionSet instead.
     104             :         //
     105             :         // virtualBackings is modified under DB.mu and the log lock. If it is accessed
     106             :         // under DB.mu and a version update is in progress, it reflects the state of
     107             :         // the next version.
     108             :         virtualBackings manifest.VirtualBackings
     109             : 
     110             :         // minUnflushedLogNum is the smallest WAL log file number corresponding to
     111             :         // mutations that have not been flushed to an sstable.
     112             :         minUnflushedLogNum base.DiskFileNum
     113             : 
     114             :         // The next file number. A single counter is used to assign file
     115             :         // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
     116             :         nextFileNum uint64
     117             : 
     118             :         // The current manifest file number.
     119             :         manifestFileNum base.DiskFileNum
     120             :         manifestMarker  *atomicfs.Marker
     121             : 
     122             :         manifestFile          vfs.File
     123             :         manifest              *record.Writer
     124             :         getFormatMajorVersion func() FormatMajorVersion
     125             : 
     126             :         writing    bool
     127             :         writerCond sync.Cond
     128             :         // State for deciding when to write a snapshot. Protected by mu.
     129             :         rotationHelper record.RotationHelper
     130             : }
     131             : 
     132             : type tableInfo struct {
     133             :         fileInfo
     134             :         isLocal bool
     135             : }
     136             : 
     137             : func (vs *versionSet) init(
     138             :         dirname string,
     139             :         provider objstorage.Provider,
     140             :         opts *Options,
     141             :         marker *atomicfs.Marker,
     142             :         getFMV func() FormatMajorVersion,
     143             :         mu *sync.Mutex,
     144           1 : ) {
     145           1 :         vs.dirname = dirname
     146           1 :         vs.provider = provider
     147           1 :         vs.mu = mu
     148           1 :         vs.writerCond.L = mu
     149           1 :         vs.opts = opts
     150           1 :         vs.fs = opts.FS
     151           1 :         vs.cmp = opts.Comparer
     152           1 :         vs.dynamicBaseLevel = true
     153           1 :         vs.versions.Init(mu)
     154           1 :         vs.obsoleteFn = vs.addObsoleteLocked
     155           1 :         vs.zombieTables = make(map[base.DiskFileNum]tableInfo)
     156           1 :         vs.virtualBackings = manifest.MakeVirtualBackings()
     157           1 :         vs.nextFileNum = 1
     158           1 :         vs.manifestMarker = marker
     159           1 :         vs.getFormatMajorVersion = getFMV
     160           1 : }
     161             : 
     162             : // create creates a version set for a fresh DB.
     163             : func (vs *versionSet) create(
     164             :         jobID JobID,
     165             :         dirname string,
     166             :         provider objstorage.Provider,
     167             :         opts *Options,
     168             :         marker *atomicfs.Marker,
     169             :         getFormatMajorVersion func() FormatMajorVersion,
     170             :         mu *sync.Mutex,
     171           1 : ) error {
     172           1 :         vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
     173           1 :         newVersion := &version{}
     174           1 :         vs.append(newVersion)
     175           1 :         var err error
     176           1 : 
     177           1 :         vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil)
     178           1 :         // Note that a "snapshot" version edit is written to the manifest when it is
     179           1 :         // created.
     180           1 :         vs.manifestFileNum = vs.getNextDiskFileNum()
     181           1 :         err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum, nil /* virtualBackings */)
     182           1 :         if err == nil {
     183           1 :                 if err = vs.manifest.Flush(); err != nil {
     184           0 :                         vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
     185           0 :                 }
     186             :         }
     187           1 :         if err == nil {
     188           1 :                 if err = vs.manifestFile.Sync(); err != nil {
     189           0 :                         vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
     190           0 :                 }
     191             :         }
     192           1 :         if err == nil {
     193           1 :                 // NB: Move() is responsible for syncing the data directory.
     194           1 :                 if err = vs.manifestMarker.Move(base.MakeFilename(fileTypeManifest, vs.manifestFileNum)); err != nil {
     195           0 :                         vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
     196           0 :                 }
     197             :         }
     198             : 
     199           1 :         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     200           1 :                 JobID:   int(jobID),
     201           1 :                 Path:    base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, vs.manifestFileNum),
     202           1 :                 FileNum: vs.manifestFileNum,
     203           1 :                 Err:     err,
     204           1 :         })
     205           1 :         if err != nil {
     206           0 :                 return err
     207           0 :         }
     208           1 :         return nil
     209             : }
     210             : 
     211             : // load loads the version set from the manifest file.
     212             : func (vs *versionSet) load(
     213             :         dirname string,
     214             :         provider objstorage.Provider,
     215             :         opts *Options,
     216             :         manifestFileNum base.DiskFileNum,
     217             :         marker *atomicfs.Marker,
     218             :         getFormatMajorVersion func() FormatMajorVersion,
     219             :         mu *sync.Mutex,
     220           1 : ) error {
     221           1 :         vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
     222           1 : 
     223           1 :         vs.manifestFileNum = manifestFileNum
     224           1 :         manifestPath := base.MakeFilepath(opts.FS, dirname, fileTypeManifest, vs.manifestFileNum)
     225           1 :         manifestFilename := opts.FS.PathBase(manifestPath)
     226           1 : 
     227           1 :         // Read the versionEdits in the manifest file.
     228           1 :         var bve bulkVersionEdit
     229           1 :         bve.AddedByFileNum = make(map[base.FileNum]*fileMetadata)
     230           1 :         manifest, err := vs.fs.Open(manifestPath)
     231           1 :         if err != nil {
     232           0 :                 return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
     233           0 :                         errors.Safe(manifestFilename), dirname)
     234           0 :         }
     235           1 :         defer manifest.Close()
     236           1 :         rr := record.NewReader(manifest, 0 /* logNum */)
     237           1 :         for {
     238           1 :                 r, err := rr.Next()
     239           1 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     240           1 :                         break
     241             :                 }
     242           1 :                 if err != nil {
     243           0 :                         return errors.Wrapf(err, "pebble: error when loading manifest file %q",
     244           0 :                                 errors.Safe(manifestFilename))
     245           0 :                 }
     246           1 :                 var ve versionEdit
     247           1 :                 err = ve.Decode(r)
     248           1 :                 if err != nil {
     249           0 :                         // Break instead of returning an error if the record is corrupted
     250           0 :                         // or invalid.
     251           0 :                         if err == io.EOF || record.IsInvalidRecord(err) {
     252           0 :                                 break
     253             :                         }
     254           0 :                         return err
     255             :                 }
     256           1 :                 if ve.ComparerName != "" {
     257           1 :                         if ve.ComparerName != vs.cmp.Name {
     258           0 :                                 return errors.Errorf("pebble: manifest file %q for DB %q: "+
     259           0 :                                         "comparer name from file %q != comparer name from Options %q",
     260           0 :                                         errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmp.Name))
     261           0 :                         }
     262             :                 }
     263           1 :                 if err := bve.Accumulate(&ve); err != nil {
     264           0 :                         return err
     265           0 :                 }
     266           1 :                 if ve.MinUnflushedLogNum != 0 {
     267           1 :                         vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     268           1 :                 }
     269           1 :                 if ve.NextFileNum != 0 {
     270           1 :                         vs.nextFileNum = ve.NextFileNum
     271           1 :                 }
     272           1 :                 if ve.LastSeqNum != 0 {
     273           1 :                         // logSeqNum is the _next_ sequence number that will be assigned,
     274           1 :                         // while LastSeqNum is the last assigned sequence number. Note that
     275           1 :                         // this behaviour mimics that in RocksDB; the first sequence number
     276           1 :                         // assigned is one greater than the one present in the manifest
     277           1 :                         // (assuming no WALs contain higher sequence numbers than the
     278           1 :                         // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
     279           1 :                         // next sequence number that will be assigned.
     280           1 :                         //
     281           1 :                         // If LastSeqNum is less than SeqNumStart, increase it to at least
     282           1 :                         // SeqNumStart to leave ample room for reserved sequence numbers.
     283           1 :                         if ve.LastSeqNum+1 < base.SeqNumStart {
     284           0 :                                 vs.logSeqNum.Store(base.SeqNumStart)
     285           1 :                         } else {
     286           1 :                                 vs.logSeqNum.Store(ve.LastSeqNum + 1)
     287           1 :                         }
     288             :                 }
     289             :         }
     290             :         // We have already set vs.nextFileNum = 2 at the beginning of the
     291             :         // function and could have only updated it to some other non-zero value,
     292             :         // so it cannot be 0 here.
     293           1 :         if vs.minUnflushedLogNum == 0 {
     294           0 :                 if vs.nextFileNum >= 2 {
     295           0 :                         // We either have a freshly created DB, or a DB created by RocksDB
     296           0 :                         // that has not had a single flushed SSTable yet. This is because
     297           0 :                         // RocksDB bumps up nextFileNum in this case without bumping up
     298           0 :                         // minUnflushedLogNum, even if WALs with non-zero file numbers are
     299           0 :                         // present in the directory.
     300           0 :                 } else {
     301           0 :                         return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
     302           0 :                                 errors.Safe(manifestFilename), dirname)
     303           0 :                 }
     304             :         }
     305           1 :         vs.markFileNumUsed(vs.minUnflushedLogNum)
     306           1 : 
     307           1 :         // Populate the fileBackingMap and the FileBacking for virtual sstables since
     308           1 :         // we have finished version edit accumulation.
     309           1 :         for _, b := range bve.AddedFileBacking {
     310           1 :                 vs.virtualBackings.AddAndRef(b)
     311           1 :         }
     312             : 
     313           1 :         for _, addedLevel := range bve.Added {
     314           1 :                 for _, m := range addedLevel {
     315           1 :                         if m.Virtual {
     316           1 :                                 vs.virtualBackings.AddTable(m)
     317           1 :                         }
     318             :                 }
     319             :         }
     320             : 
     321           1 :         if invariants.Enabled {
     322           1 :                 // There should be no deleted tables or backings, since we're starting from
     323           1 :                 // an empty state.
     324           1 :                 for _, deletedLevel := range bve.Deleted {
     325           1 :                         if len(deletedLevel) != 0 {
     326           0 :                                 panic("deleted files after manifest replay")
     327             :                         }
     328             :                 }
     329           1 :                 if len(bve.RemovedFileBacking) > 0 {
     330           0 :                         panic("deleted backings after manifest replay")
     331             :                 }
     332             :         }
     333             : 
     334           1 :         newVersion, err := bve.Apply(nil, opts.Comparer, opts.FlushSplitBytes, opts.Experimental.ReadCompactionRate)
     335           1 :         if err != nil {
     336           0 :                 return err
     337           0 :         }
     338           1 :         newVersion.L0Sublevels.InitCompactingFileInfo(nil /* in-progress compactions */)
     339           1 :         vs.append(newVersion)
     340           1 : 
     341           1 :         for i := range vs.metrics.Levels {
     342           1 :                 l := &vs.metrics.Levels[i]
     343           1 :                 l.NumFiles = int64(newVersion.Levels[i].Len())
     344           1 :                 files := newVersion.Levels[i].Slice()
     345           1 :                 l.Size = int64(files.SizeSum())
     346           1 :         }
     347           1 :         for _, l := range newVersion.Levels {
     348           1 :                 iter := l.Iter()
     349           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
     350           1 :                         if !f.Virtual {
     351           1 :                                 _, localSize := sizeIfLocal(f.FileBacking, vs.provider)
     352           1 :                                 vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
     353           1 :                         }
     354             :                 }
     355             :         }
     356           1 :         vs.virtualBackings.ForEach(func(backing *fileBacking) {
     357           1 :                 _, localSize := sizeIfLocal(backing, vs.provider)
     358           1 :                 vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
     359           1 :         })
     360             : 
     361           1 :         vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil)
     362           1 :         return nil
     363             : }
     364             : 
     365           1 : func (vs *versionSet) close() error {
     366           1 :         if vs.manifestFile != nil {
     367           1 :                 if err := vs.manifestFile.Close(); err != nil {
     368           0 :                         return err
     369           0 :                 }
     370             :         }
     371           1 :         if vs.manifestMarker != nil {
     372           1 :                 if err := vs.manifestMarker.Close(); err != nil {
     373           0 :                         return err
     374           0 :                 }
     375             :         }
     376           1 :         return nil
     377             : }
     378             : 
     379             : // logLock locks the manifest for writing. The lock must be released by either
     380             : // a call to logUnlock or logAndApply.
     381             : //
     382             : // DB.mu must be held when calling this method, but the mutex may be dropped and
     383             : // re-acquired during the course of this method.
     384           1 : func (vs *versionSet) logLock() {
     385           1 :         // Wait for any existing writing to the manifest to complete, then mark the
     386           1 :         // manifest as busy.
     387           1 :         for vs.writing {
     388           1 :                 // Note: writerCond.L is DB.mu, so we unlock it while we wait.
     389           1 :                 vs.writerCond.Wait()
     390           1 :         }
     391           1 :         vs.writing = true
     392             : }
     393             : 
     394             : // logUnlock releases the lock for manifest writing.
     395             : //
     396             : // DB.mu must be held when calling this method.
     397           1 : func (vs *versionSet) logUnlock() {
     398           1 :         if !vs.writing {
     399           0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     400           0 :         }
     401           1 :         vs.writing = false
     402           1 :         vs.writerCond.Signal()
     403             : }
     404             : 
     405             : // logAndApply logs the version edit to the manifest, applies the version edit
     406             : // to the current version, and installs the new version.
     407             : //
     408             : // logAndApply fills in the following fields of the VersionEdit: NextFileNum,
     409             : // LastSeqNum, RemovedBackingTables. The removed backing tables are those
     410             : // backings that are no longer used (in the new version) after applying the edit
     411             : // (as per vs.virtualBackings). Other than these fields, the VersionEdit must be
     412             : // complete.
     413             : //
     414             : // New table backing references (FileBacking.Ref) are taken as part of applying
     415             : // the version edit. The state of the virtual backings (vs.virtualBackings) is
     416             : // updated before logging to the manifest and installing the latest version;
     417             : // this is ok because any failure in those steps is fatal.
     418             : // TODO(radu): remove the error return.
     419             : //
     420             : // DB.mu must be held when calling this method and will be released temporarily
     421             : // while performing file I/O. Requires that the manifest is locked for writing
     422             : // (see logLock). Will unconditionally release the manifest lock (via
     423             : // logUnlock) even if an error occurs.
     424             : //
     425             : // inProgressCompactions is called while DB.mu is held, to get the list of
     426             : // in-progress compactions.
     427             : func (vs *versionSet) logAndApply(
     428             :         jobID JobID,
     429             :         ve *versionEdit,
     430             :         metrics map[int]*LevelMetrics,
     431             :         forceRotation bool,
     432             :         inProgressCompactions func() []compactionInfo,
     433           1 : ) error {
     434           1 :         if !vs.writing {
     435           0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     436           0 :         }
     437           1 :         defer vs.logUnlock()
     438           1 : 
     439           1 :         if ve.MinUnflushedLogNum != 0 {
     440           1 :                 if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
     441           1 :                         vs.nextFileNum <= uint64(ve.MinUnflushedLogNum) {
     442           0 :                         panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
     443           0 :                                 ve.MinUnflushedLogNum))
     444             :                 }
     445             :         }
     446             : 
     447             :         // This is the next manifest filenum, but if the current file is too big we
     448             :         // will write this ve to the next file which means what ve encodes is the
     449             :         // current filenum and not the next one.
     450             :         //
     451             :         // TODO(sbhola): figure out why this is correct and update comment.
     452           1 :         ve.NextFileNum = vs.nextFileNum
     453           1 : 
     454           1 :         // LastSeqNum is set to the current upper bound on the assigned sequence
     455           1 :         // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
     456           1 :         // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
     457           1 :         // replay. It must be higher than or equal to any than any sequence number
     458           1 :         // written to an sstable, including sequence numbers in ingested files.
     459           1 :         // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
     460           1 :         // number. This is fallout from ingestion which allows a sequence number X to
     461           1 :         // be assigned to an ingested sstable even though sequence number X-1 resides
     462           1 :         // in an unflushed memtable. logSeqNum is the _next_ sequence number that
     463           1 :         // will be assigned, so subtract that by 1 to get the upper bound on the
     464           1 :         // last assigned sequence number.
     465           1 :         logSeqNum := vs.logSeqNum.Load()
     466           1 :         ve.LastSeqNum = logSeqNum - 1
     467           1 :         if logSeqNum == 0 {
     468           0 :                 // logSeqNum is initialized to 1 in Open() if there are no previous WAL
     469           0 :                 // or manifest records, so this case should never happen.
     470           0 :                 vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
     471           0 :         }
     472             : 
     473           1 :         currentVersion := vs.currentVersion()
     474           1 :         var newVersion *version
     475           1 : 
     476           1 :         // Generate a new manifest if we don't currently have one, or forceRotation
     477           1 :         // is true, or the current one is too large.
     478           1 :         //
     479           1 :         // For largeness, we do not exclusively use MaxManifestFileSize size
     480           1 :         // threshold since we have had incidents where due to either large keys or
     481           1 :         // large numbers of files, each edit results in a snapshot + write of the
     482           1 :         // edit. This slows the system down since each flush or compaction is
     483           1 :         // writing a new manifest snapshot. The primary goal of the size-based
     484           1 :         // rollover logic is to ensure that when reopening a DB, the number of edits
     485           1 :         // that need to be replayed on top of the snapshot is "sane". Rolling over
     486           1 :         // to a new manifest after each edit is not relevant to that goal.
     487           1 :         //
     488           1 :         // Consider the following cases:
     489           1 :         // - The number of live files F in the DB is roughly stable: after writing
     490           1 :         //   the snapshot (with F files), say we require that there be enough edits
     491           1 :         //   such that the cumulative number of files in those edits, E, be greater
     492           1 :         //   than F. This will ensure that the total amount of time in logAndApply
     493           1 :         //   that is spent in snapshot writing is ~50%.
     494           1 :         //
     495           1 :         // - The number of live files F in the DB is shrinking drastically, say from
     496           1 :         //   F to F/10: This can happen for various reasons, like wide range
     497           1 :         //   tombstones, or large numbers of smaller than usual files that are being
     498           1 :         //   merged together into larger files. And say the new files generated
     499           1 :         //   during this shrinkage is insignificant compared to F/10, and so for
     500           1 :         //   this example we will assume it is effectively 0. After this shrinking,
     501           1 :         //   E = 0.9F, and so if we used the previous snapshot file count, F, as the
     502           1 :         //   threshold that needs to be exceeded, we will further delay the snapshot
     503           1 :         //   writing. Which means on DB reopen we will need to replay 0.9F edits to
     504           1 :         //   get to a version with 0.1F files. It would be better to create a new
     505           1 :         //   snapshot when E exceeds the number of files in the current version.
     506           1 :         //
     507           1 :         // - The number of live files F in the DB is growing via perfect ingests
     508           1 :         //   into L6: Say we wrote the snapshot when there were F files and now we
     509           1 :         //   have 10F files, so E = 9F. We will further delay writing a new
     510           1 :         //   snapshot. This case can be critiqued as contrived, but we consider it
     511           1 :         //   nonetheless.
     512           1 :         //
     513           1 :         // The logic below uses the min of the last snapshot file count and the file
     514           1 :         // count in the current version.
     515           1 :         vs.rotationHelper.AddRecord(int64(len(ve.DeletedFiles) + len(ve.NewFiles)))
     516           1 :         sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
     517           1 :         requireRotation := forceRotation || vs.manifest == nil
     518           1 : 
     519           1 :         var nextSnapshotFilecount int64
     520           1 :         for i := range vs.metrics.Levels {
     521           1 :                 nextSnapshotFilecount += vs.metrics.Levels[i].NumFiles
     522           1 :         }
     523           1 :         if sizeExceeded && !requireRotation {
     524           1 :                 requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
     525           1 :         }
     526           1 :         var newManifestFileNum base.DiskFileNum
     527           1 :         var prevManifestFileSize uint64
     528           1 :         var newManifestVirtualBackings []*fileBacking
     529           1 :         if requireRotation {
     530           1 :                 newManifestFileNum = vs.getNextDiskFileNum()
     531           1 :                 prevManifestFileSize = uint64(vs.manifest.Size())
     532           1 : 
     533           1 :                 // We want the virtual backings *before* applying the version edit, because
     534           1 :                 // the new manifest will contain the pre-apply version plus the last version
     535           1 :                 // edit.
     536           1 :                 newManifestVirtualBackings = vs.virtualBackings.Backings()
     537           1 :         }
     538             : 
     539             :         // Grab certain values before releasing vs.mu, in case createManifest() needs
     540             :         // to be called.
     541           1 :         minUnflushedLogNum := vs.minUnflushedLogNum
     542           1 :         nextFileNum := vs.nextFileNum
     543           1 : 
     544           1 :         // Note: this call populates ve.RemovedBackingTables.
     545           1 :         zombieBackings, removedVirtualBackings, localLiveSizeDelta :=
     546           1 :                 getZombiesAndUpdateVirtualBackings(ve, &vs.virtualBackings, vs.provider)
     547           1 : 
     548           1 :         if err := func() error {
     549           1 :                 vs.mu.Unlock()
     550           1 :                 defer vs.mu.Lock()
     551           1 : 
     552           1 :                 if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
     553           0 :                         return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
     554           0 :                 }
     555           1 :                 var b bulkVersionEdit
     556           1 :                 err := b.Accumulate(ve)
     557           1 :                 if err != nil {
     558           0 :                         return errors.Wrap(err, "MANIFEST accumulate failed")
     559           0 :                 }
     560           1 :                 newVersion, err = b.Apply(
     561           1 :                         currentVersion, vs.cmp, vs.opts.FlushSplitBytes, vs.opts.Experimental.ReadCompactionRate,
     562           1 :                 )
     563           1 :                 if err != nil {
     564           0 :                         return errors.Wrap(err, "MANIFEST apply failed")
     565           0 :                 }
     566             : 
     567           1 :                 if newManifestFileNum != 0 {
     568           1 :                         if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
     569           0 :                                 vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     570           0 :                                         JobID:   int(jobID),
     571           0 :                                         Path:    base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
     572           0 :                                         FileNum: newManifestFileNum,
     573           0 :                                         Err:     err,
     574           0 :                                 })
     575           0 :                                 return errors.Wrap(err, "MANIFEST create failed")
     576           0 :                         }
     577             :                 }
     578             : 
     579           1 :                 w, err := vs.manifest.Next()
     580           1 :                 if err != nil {
     581           0 :                         return errors.Wrap(err, "MANIFEST next record write failed")
     582           0 :                 }
     583             : 
     584             :                 // NB: Any error from this point on is considered fatal as we don't know if
     585             :                 // the MANIFEST write occurred or not. Trying to determine that is
     586             :                 // fraught. Instead we rely on the standard recovery mechanism run when a
     587             :                 // database is open. In particular, that mechanism generates a new MANIFEST
     588             :                 // and ensures it is synced.
     589           1 :                 if err := ve.Encode(w); err != nil {
     590           0 :                         return errors.Wrap(err, "MANIFEST write failed")
     591           0 :                 }
     592           1 :                 if err := vs.manifest.Flush(); err != nil {
     593           0 :                         return errors.Wrap(err, "MANIFEST flush failed")
     594           0 :                 }
     595           1 :                 if err := vs.manifestFile.Sync(); err != nil {
     596           0 :                         return errors.Wrap(err, "MANIFEST sync failed")
     597           0 :                 }
     598           1 :                 if newManifestFileNum != 0 {
     599           1 :                         // NB: Move() is responsible for syncing the data directory.
     600           1 :                         if err := vs.manifestMarker.Move(base.MakeFilename(fileTypeManifest, newManifestFileNum)); err != nil {
     601           0 :                                 return errors.Wrap(err, "MANIFEST set current failed")
     602           0 :                         }
     603           1 :                         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     604           1 :                                 JobID:   int(jobID),
     605           1 :                                 Path:    base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
     606           1 :                                 FileNum: newManifestFileNum,
     607           1 :                         })
     608             :                 }
     609           1 :                 return nil
     610           0 :         }(); err != nil {
     611           0 :                 // Any error encountered during any of the operations in the previous
     612           0 :                 // closure are considered fatal. Treating such errors as fatal is preferred
     613           0 :                 // to attempting to unwind various file and b-tree reference counts, and
     614           0 :                 // re-generating L0 sublevel metadata. This may change in the future, if
     615           0 :                 // certain manifest / WAL operations become retryable. For more context, see
     616           0 :                 // #1159 and #1792.
     617           0 :                 vs.opts.Logger.Fatalf("%s", err)
     618           0 :                 return err
     619           0 :         }
     620             : 
     621           1 :         if requireRotation {
     622           1 :                 // Successfully rotated.
     623           1 :                 vs.rotationHelper.Rotate(nextSnapshotFilecount)
     624           1 :         }
     625             :         // Now that DB.mu is held again, initialize compacting file info in
     626             :         // L0Sublevels.
     627           1 :         inProgress := inProgressCompactions()
     628           1 : 
     629           1 :         newVersion.L0Sublevels.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
     630           1 : 
     631           1 :         // Update the zombie tables set first, as installation of the new version
     632           1 :         // will unref the previous version which could result in addObsoleteLocked
     633           1 :         // being called.
     634           1 :         for _, b := range zombieBackings {
     635           1 :                 vs.zombieTables[b.backing.DiskFileNum] = tableInfo{
     636           1 :                         fileInfo: fileInfo{
     637           1 :                                 FileNum:  b.backing.DiskFileNum,
     638           1 :                                 FileSize: b.backing.Size,
     639           1 :                         },
     640           1 :                         isLocal: b.isLocal,
     641           1 :                 }
     642           1 :         }
     643             : 
     644             :         // Unref the removed backings and report those that already became obsolete.
     645             :         // Note that the only case where we report obsolete tables here is when
     646             :         // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
     647             :         // it being used in the current version.
     648           1 :         var obsoleteVirtualBackings []*fileBacking
     649           1 :         for _, b := range removedVirtualBackings {
     650           1 :                 if b.backing.Unref() == 0 {
     651           1 :                         obsoleteVirtualBackings = append(obsoleteVirtualBackings, b.backing)
     652           1 :                 }
     653             :         }
     654           1 :         vs.addObsoleteLocked(obsoleteVirtualBackings)
     655           1 : 
     656           1 :         // Install the new version.
     657           1 :         vs.append(newVersion)
     658           1 : 
     659           1 :         if ve.MinUnflushedLogNum != 0 {
     660           1 :                 vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     661           1 :         }
     662           1 :         if newManifestFileNum != 0 {
     663           1 :                 if vs.manifestFileNum != 0 {
     664           1 :                         vs.obsoleteManifests = append(vs.obsoleteManifests, fileInfo{
     665           1 :                                 FileNum:  vs.manifestFileNum,
     666           1 :                                 FileSize: prevManifestFileSize,
     667           1 :                         })
     668           1 :                 }
     669           1 :                 vs.manifestFileNum = newManifestFileNum
     670             :         }
     671             : 
     672           1 :         for level, update := range metrics {
     673           1 :                 vs.metrics.Levels[level].Add(update)
     674           1 :         }
     675           1 :         for i := range vs.metrics.Levels {
     676           1 :                 l := &vs.metrics.Levels[i]
     677           1 :                 l.NumFiles = int64(newVersion.Levels[i].Len())
     678           1 :                 l.NumVirtualFiles = newVersion.Levels[i].NumVirtual
     679           1 :                 l.VirtualSize = newVersion.Levels[i].VirtualSize
     680           1 :                 l.Size = int64(newVersion.Levels[i].Size())
     681           1 : 
     682           1 :                 l.Sublevels = 0
     683           1 :                 if l.NumFiles > 0 {
     684           1 :                         l.Sublevels = 1
     685           1 :                 }
     686           1 :                 if invariants.Enabled {
     687           1 :                         levelFiles := newVersion.Levels[i].Slice()
     688           1 :                         if size := int64(levelFiles.SizeSum()); l.Size != size {
     689           0 :                                 vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Size, size)
     690           0 :                         }
     691           1 :                         if nVirtual := levelFiles.NumVirtual(); nVirtual != l.NumVirtualFiles {
     692           0 :                                 vs.opts.Logger.Fatalf(
     693           0 :                                         "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
     694           0 :                                         i, l.NumVirtualFiles, nVirtual,
     695           0 :                                 )
     696           0 :                         }
     697           1 :                         if vSize := levelFiles.VirtualSizeSum(); vSize != l.VirtualSize {
     698           0 :                                 vs.opts.Logger.Fatalf(
     699           0 :                                         "versionSet metrics L%d Virtual size = %d, actual size = %d",
     700           0 :                                         i, l.VirtualSize, vSize,
     701           0 :                                 )
     702           0 :                         }
     703             :                 }
     704             :         }
     705           1 :         vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
     706           1 :         vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localLiveSizeDelta)
     707           1 : 
     708           1 :         vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, inProgress)
     709           1 :         if !vs.dynamicBaseLevel {
     710           0 :                 vs.picker.forceBaseLevel1()
     711           0 :         }
     712           1 :         return nil
     713             : }
     714             : 
     715             : type fileBackingInfo struct {
     716             :         backing *fileBacking
     717             :         isLocal bool
     718             : }
     719             : 
     720             : // getZombiesAndUpdateVirtualBackings updates the virtual backings with the
     721             : // changes in the versionEdit and populates ve.RemovedBackingTables.
     722             : // Returns:
     723             : //   - zombieBackings: all backings (physical and virtual) that will no longer be
     724             : //     needed when we apply ve.
     725             : //   - removedVirtualBackings: the virtual backings that will be removed by the
     726             : //     VersionEdit and which must be Unref()ed by the caller. These backings
     727             : //     match ve.RemovedBackingTables.
     728             : //   - localLiveSizeDelta: the delta in local live bytes.
     729             : func getZombiesAndUpdateVirtualBackings(
     730             :         ve *versionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
     731           1 : ) (zombieBackings, removedVirtualBackings []fileBackingInfo, localLiveSizeDelta int64) {
     732           1 :         // First, deal with the physical tables.
     733           1 :         //
     734           1 :         // A physical backing has become unused if it is in DeletedFiles but not in
     735           1 :         // NewFiles or CreatedBackingTables.
     736           1 :         //
     737           1 :         // Note that for the common case where there are very few elements, the map
     738           1 :         // will stay on the stack.
     739           1 :         stillUsed := make(map[base.DiskFileNum]struct{})
     740           1 :         for _, nf := range ve.NewFiles {
     741           1 :                 if !nf.Meta.Virtual {
     742           1 :                         stillUsed[nf.Meta.FileBacking.DiskFileNum] = struct{}{}
     743           1 :                         _, localFileDelta := sizeIfLocal(nf.Meta.FileBacking, provider)
     744           1 :                         localLiveSizeDelta += localFileDelta
     745           1 :                 }
     746             :         }
     747           1 :         for _, b := range ve.CreatedBackingTables {
     748           1 :                 stillUsed[b.DiskFileNum] = struct{}{}
     749           1 :         }
     750           1 :         for _, m := range ve.DeletedFiles {
     751           1 :                 if !m.Virtual {
     752           1 :                         // NB: this deleted file may also be in NewFiles or
     753           1 :                         // CreatedBackingTables, due to a file moving between levels, or
     754           1 :                         // becoming virtualized. In which case there is no change due to this
     755           1 :                         // file in the localLiveSizeDelta -- the subtraction below compensates
     756           1 :                         // for the addition.
     757           1 :                         isLocal, localFileDelta := sizeIfLocal(m.FileBacking, provider)
     758           1 :                         localLiveSizeDelta -= localFileDelta
     759           1 :                         if _, ok := stillUsed[m.FileBacking.DiskFileNum]; !ok {
     760           1 :                                 zombieBackings = append(zombieBackings, fileBackingInfo{
     761           1 :                                         backing: m.FileBacking,
     762           1 :                                         isLocal: isLocal,
     763           1 :                                 })
     764           1 :                         }
     765             :                 }
     766             :         }
     767             : 
     768             :         // Now deal with virtual tables.
     769             :         //
     770             :         // When a virtual table moves between levels we AddTable() then RemoveTable(),
     771             :         // which works out.
     772           1 :         for _, b := range ve.CreatedBackingTables {
     773           1 :                 virtualBackings.AddAndRef(b)
     774           1 :                 _, localFileDelta := sizeIfLocal(b, provider)
     775           1 :                 localLiveSizeDelta += localFileDelta
     776           1 :         }
     777           1 :         for _, nf := range ve.NewFiles {
     778           1 :                 if nf.Meta.Virtual {
     779           1 :                         virtualBackings.AddTable(nf.Meta)
     780           1 :                 }
     781             :         }
     782           1 :         for _, m := range ve.DeletedFiles {
     783           1 :                 if m.Virtual {
     784           1 :                         virtualBackings.RemoveTable(m)
     785           1 :                 }
     786             :         }
     787             : 
     788           1 :         if unused := virtualBackings.Unused(); len(unused) > 0 {
     789           1 :                 // Virtual backings that are no longer used are zombies and are also added
     790           1 :                 // to RemovedBackingTables (before the version edit is written to disk).
     791           1 :                 ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
     792           1 :                 for i, b := range unused {
     793           1 :                         isLocal, localFileDelta := sizeIfLocal(b, provider)
     794           1 :                         localLiveSizeDelta -= localFileDelta
     795           1 :                         ve.RemovedBackingTables[i] = b.DiskFileNum
     796           1 :                         zombieBackings = append(zombieBackings, fileBackingInfo{
     797           1 :                                 backing: b,
     798           1 :                                 isLocal: isLocal,
     799           1 :                         })
     800           1 :                         virtualBackings.Remove(b.DiskFileNum)
     801           1 :                 }
     802           1 :                 removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
     803             :         }
     804           1 :         return zombieBackings, removedVirtualBackings, localLiveSizeDelta
     805             : }
     806             : 
     807             : // sizeIfLocal returns backing.Size if the backing is a local file, else 0.
     808             : func sizeIfLocal(
     809             :         backing *fileBacking, provider objstorage.Provider,
     810           1 : ) (isLocal bool, localSize int64) {
     811           1 :         isLocal = objstorage.IsLocalTable(provider, backing.DiskFileNum)
     812           1 :         if isLocal {
     813           1 :                 return true, int64(backing.Size)
     814           1 :         }
     815           1 :         return false, 0
     816             : }
     817             : 
     818             : func (vs *versionSet) incrementCompactions(
     819             :         kind compactionKind, extraLevels []*compactionLevel, pickerMetrics compactionPickerMetrics,
     820           1 : ) {
     821           1 :         switch kind {
     822           1 :         case compactionKindDefault:
     823           1 :                 vs.metrics.Compact.Count++
     824           1 :                 vs.metrics.Compact.DefaultCount++
     825             : 
     826           1 :         case compactionKindFlush, compactionKindIngestedFlushable:
     827           1 :                 vs.metrics.Flush.Count++
     828             : 
     829           1 :         case compactionKindMove:
     830           1 :                 vs.metrics.Compact.Count++
     831           1 :                 vs.metrics.Compact.MoveCount++
     832             : 
     833           1 :         case compactionKindDeleteOnly:
     834           1 :                 vs.metrics.Compact.Count++
     835           1 :                 vs.metrics.Compact.DeleteOnlyCount++
     836             : 
     837           1 :         case compactionKindElisionOnly:
     838           1 :                 vs.metrics.Compact.Count++
     839           1 :                 vs.metrics.Compact.ElisionOnlyCount++
     840             : 
     841           0 :         case compactionKindRead:
     842           0 :                 vs.metrics.Compact.Count++
     843           0 :                 vs.metrics.Compact.ReadCount++
     844             : 
     845           1 :         case compactionKindRewrite:
     846           1 :                 vs.metrics.Compact.Count++
     847           1 :                 vs.metrics.Compact.RewriteCount++
     848             :         }
     849           1 :         if len(extraLevels) > 0 {
     850           1 :                 vs.metrics.Compact.MultiLevelCount++
     851           1 :         }
     852             : }
     853             : 
     854           1 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
     855           1 :         vs.atomicInProgressBytes.Add(numBytes)
     856           1 : }
     857             : 
     858             : // createManifest creates a manifest file that contains a snapshot of vs.
     859             : func (vs *versionSet) createManifest(
     860             :         dirname string,
     861             :         fileNum, minUnflushedLogNum base.DiskFileNum,
     862             :         nextFileNum uint64,
     863             :         virtualBackings []*fileBacking,
     864           1 : ) (err error) {
     865           1 :         var (
     866           1 :                 filename     = base.MakeFilepath(vs.fs, dirname, fileTypeManifest, fileNum)
     867           1 :                 manifestFile vfs.File
     868           1 :                 manifest     *record.Writer
     869           1 :         )
     870           1 :         defer func() {
     871           1 :                 if manifest != nil {
     872           0 :                         manifest.Close()
     873           0 :                 }
     874           1 :                 if manifestFile != nil {
     875           0 :                         manifestFile.Close()
     876           0 :                 }
     877           1 :                 if err != nil {
     878           0 :                         vs.fs.Remove(filename)
     879           0 :                 }
     880             :         }()
     881           1 :         manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
     882           1 :         if err != nil {
     883           0 :                 return err
     884           0 :         }
     885           1 :         manifest = record.NewWriter(manifestFile)
     886           1 : 
     887           1 :         snapshot := versionEdit{
     888           1 :                 ComparerName: vs.cmp.Name,
     889           1 :         }
     890           1 : 
     891           1 :         for level, levelMetadata := range vs.currentVersion().Levels {
     892           1 :                 iter := levelMetadata.Iter()
     893           1 :                 for meta := iter.First(); meta != nil; meta = iter.Next() {
     894           1 :                         snapshot.NewFiles = append(snapshot.NewFiles, newFileEntry{
     895           1 :                                 Level: level,
     896           1 :                                 Meta:  meta,
     897           1 :                         })
     898           1 :                 }
     899             :         }
     900             : 
     901           1 :         snapshot.CreatedBackingTables = virtualBackings
     902           1 : 
     903           1 :         // When creating a version snapshot for an existing DB, this snapshot VersionEdit will be
     904           1 :         // immediately followed by another VersionEdit (being written in logAndApply()). That
     905           1 :         // VersionEdit always contains a LastSeqNum, so we don't need to include that in the snapshot.
     906           1 :         // But it does not necessarily include MinUnflushedLogNum, NextFileNum, so we initialize those
     907           1 :         // using the corresponding fields in the versionSet (which came from the latest preceding
     908           1 :         // VersionEdit that had those fields).
     909           1 :         snapshot.MinUnflushedLogNum = minUnflushedLogNum
     910           1 :         snapshot.NextFileNum = nextFileNum
     911           1 : 
     912           1 :         w, err1 := manifest.Next()
     913           1 :         if err1 != nil {
     914           0 :                 return err1
     915           0 :         }
     916           1 :         if err := snapshot.Encode(w); err != nil {
     917           0 :                 return err
     918           0 :         }
     919             : 
     920           1 :         if vs.manifest != nil {
     921           1 :                 vs.manifest.Close()
     922           1 :                 vs.manifest = nil
     923           1 :         }
     924           1 :         if vs.manifestFile != nil {
     925           1 :                 if err := vs.manifestFile.Close(); err != nil {
     926           0 :                         return err
     927           0 :                 }
     928           1 :                 vs.manifestFile = nil
     929             :         }
     930             : 
     931           1 :         vs.manifest, manifest = manifest, nil
     932           1 :         vs.manifestFile, manifestFile = manifestFile, nil
     933           1 :         return nil
     934             : }
     935             : 
     936           1 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
     937           1 :         if vs.nextFileNum <= uint64(fileNum) {
     938           1 :                 vs.nextFileNum = uint64(fileNum + 1)
     939           1 :         }
     940             : }
     941             : 
     942           1 : func (vs *versionSet) getNextFileNum() base.FileNum {
     943           1 :         x := vs.nextFileNum
     944           1 :         vs.nextFileNum++
     945           1 :         return base.FileNum(x)
     946           1 : }
     947             : 
     948           1 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
     949           1 :         x := vs.nextFileNum
     950           1 :         vs.nextFileNum++
     951           1 :         return base.DiskFileNum(x)
     952           1 : }
     953             : 
     954           1 : func (vs *versionSet) append(v *version) {
     955           1 :         if v.Refs() != 0 {
     956           0 :                 panic("pebble: version should be unreferenced")
     957             :         }
     958           1 :         if !vs.versions.Empty() {
     959           1 :                 vs.versions.Back().UnrefLocked()
     960           1 :         }
     961           1 :         v.Deleted = vs.obsoleteFn
     962           1 :         v.Ref()
     963           1 :         vs.versions.PushBack(v)
     964           1 :         if invariants.Enabled {
     965           1 :                 // Verify that the virtualBackings contains all the backings referenced by
     966           1 :                 // the version.
     967           1 :                 for _, l := range v.Levels {
     968           1 :                         iter := l.Iter()
     969           1 :                         for f := iter.First(); f != nil; f = iter.Next() {
     970           1 :                                 if f.Virtual {
     971           1 :                                         if _, ok := vs.virtualBackings.Get(f.FileBacking.DiskFileNum); !ok {
     972           0 :                                                 panic(fmt.Sprintf("%s is not in virtualBackings", f.FileBacking.DiskFileNum))
     973             :                                         }
     974             :                                 }
     975             :                         }
     976             :                 }
     977             :         }
     978             : }
     979             : 
     980           1 : func (vs *versionSet) currentVersion() *version {
     981           1 :         return vs.versions.Back()
     982           1 : }
     983             : 
     984           1 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
     985           1 :         current := vs.currentVersion()
     986           1 :         for v := vs.versions.Front(); true; v = v.Next() {
     987           1 :                 for _, lm := range v.Levels {
     988           1 :                         iter := lm.Iter()
     989           1 :                         for f := iter.First(); f != nil; f = iter.Next() {
     990           1 :                                 m[f.FileBacking.DiskFileNum] = struct{}{}
     991           1 :                         }
     992             :                 }
     993           1 :                 if v == current {
     994           1 :                         break
     995             :                 }
     996             :         }
     997             :         // virtualBackings contains backings that are referenced by some virtual
     998             :         // tables in the latest version (which are handled above), and backings that
     999             :         // are not but are still alive because of the protection mechanism (see
    1000             :         // manifset.VirtualBackings). This loop ensures the latter get added to the
    1001             :         // map.
    1002           1 :         vs.virtualBackings.ForEach(func(b *fileBacking) {
    1003           1 :                 m[b.DiskFileNum] = struct{}{}
    1004           1 :         })
    1005             : }
    1006             : 
    1007             : // addObsoleteLocked will add the fileInfo associated with obsolete backing
    1008             : // sstables to the obsolete tables list.
    1009             : //
    1010             : // The file backings in the obsolete list must not appear more than once.
    1011             : //
    1012             : // DB.mu must be held when addObsoleteLocked is called.
    1013           1 : func (vs *versionSet) addObsoleteLocked(obsolete []*fileBacking) {
    1014           1 :         if len(obsolete) == 0 {
    1015           1 :                 return
    1016           1 :         }
    1017             : 
    1018           1 :         obsoleteFileInfo := make([]tableInfo, len(obsolete))
    1019           1 :         for i, bs := range obsolete {
    1020           1 :                 obsoleteFileInfo[i].FileNum = bs.DiskFileNum
    1021           1 :                 obsoleteFileInfo[i].FileSize = bs.Size
    1022           1 :         }
    1023             : 
    1024           1 :         if invariants.Enabled {
    1025           1 :                 dedup := make(map[base.DiskFileNum]struct{})
    1026           1 :                 for _, fi := range obsoleteFileInfo {
    1027           1 :                         dedup[fi.FileNum] = struct{}{}
    1028           1 :                 }
    1029           1 :                 if len(dedup) != len(obsoleteFileInfo) {
    1030           0 :                         panic("pebble: duplicate FileBacking present in obsolete list")
    1031             :                 }
    1032             :         }
    1033             : 
    1034           1 :         for i, fi := range obsoleteFileInfo {
    1035           1 :                 // Note that the obsolete tables are no longer zombie by the definition of
    1036           1 :                 // zombie, but we leave them in the zombie tables map until they are
    1037           1 :                 // deleted from disk.
    1038           1 :                 //
    1039           1 :                 // TODO(sumeer): this means that the zombie metrics, like ZombieSize,
    1040           1 :                 // computed in DB.Metrics are also being counted in the obsolete metrics.
    1041           1 :                 // Was this intentional?
    1042           1 :                 info, ok := vs.zombieTables[fi.FileNum]
    1043           1 :                 if !ok {
    1044           0 :                         vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fi.FileNum)
    1045           0 :                 }
    1046           1 :                 obsoleteFileInfo[i].isLocal = info.isLocal
    1047             :         }
    1048             : 
    1049           1 :         vs.obsoleteTables = append(vs.obsoleteTables, obsoleteFileInfo...)
    1050           1 :         vs.updateObsoleteTableMetricsLocked()
    1051             : }
    1052             : 
    1053             : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
    1054             : // called.
    1055           1 : func (vs *versionSet) addObsolete(obsolete []*fileBacking) {
    1056           1 :         vs.mu.Lock()
    1057           1 :         defer vs.mu.Unlock()
    1058           1 :         vs.addObsoleteLocked(obsolete)
    1059           1 : }
    1060             : 
    1061           1 : func (vs *versionSet) updateObsoleteTableMetricsLocked() {
    1062           1 :         vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
    1063           1 :         vs.metrics.Table.ObsoleteSize = 0
    1064           1 :         vs.metrics.Table.Local.ObsoleteSize = 0
    1065           1 :         for _, fi := range vs.obsoleteTables {
    1066           1 :                 vs.metrics.Table.ObsoleteSize += fi.FileSize
    1067           1 :                 if fi.isLocal {
    1068           1 :                         vs.metrics.Table.Local.ObsoleteSize += fi.FileSize
    1069           1 :                 }
    1070             :         }
    1071             : }
    1072             : 
    1073             : func findCurrentManifest(
    1074             :         fs vfs.FS, dirname string, ls []string,
    1075           1 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
    1076           1 :         // Locating a marker should succeed even if the marker has never been placed.
    1077           1 :         var filename string
    1078           1 :         marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
    1079           1 :         if err != nil {
    1080           0 :                 return nil, 0, false, err
    1081           0 :         }
    1082             : 
    1083           1 :         if filename == "" {
    1084           1 :                 // The marker hasn't been set yet. This database doesn't exist.
    1085           1 :                 return marker, 0, false, nil
    1086           1 :         }
    1087             : 
    1088           1 :         var ok bool
    1089           1 :         _, manifestNum, ok = base.ParseFilename(fs, filename)
    1090           1 :         if !ok {
    1091           0 :                 return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
    1092           0 :         }
    1093           1 :         return marker, manifestNum, true, nil
    1094             : }
    1095             : 
    1096           1 : func newFileMetrics(newFiles []manifest.NewFileEntry) map[int]*LevelMetrics {
    1097           1 :         m := map[int]*LevelMetrics{}
    1098           1 :         for _, nf := range newFiles {
    1099           1 :                 lm := m[nf.Level]
    1100           1 :                 if lm == nil {
    1101           1 :                         lm = &LevelMetrics{}
    1102           1 :                         m[nf.Level] = lm
    1103           1 :                 }
    1104           1 :                 lm.NumFiles++
    1105           1 :                 lm.Size += int64(nf.Meta.Size)
    1106             :         }
    1107           1 :         return m
    1108             : }

Generated by: LCOV version 1.14