LCOV - code coverage report
Current view: top level - pebble - version_set.go (source / functions) Hit Total Coverage
Test: 2024-02-16 08:16Z 4c682705 - tests + meta.lcov Lines: 518 598 86.6 %
Date: 2024-02-16 08:17:08 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "sync"
      11             :         "sync/atomic"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/internal/manifest"
      17             :         "github.com/cockroachdb/pebble/record"
      18             :         "github.com/cockroachdb/pebble/vfs"
      19             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      20             : )
      21             : 
      22             : const numLevels = manifest.NumLevels
      23             : 
      24             : const manifestMarkerName = `manifest`
      25             : 
      26             : // Provide type aliases for the various manifest structs.
      27             : type bulkVersionEdit = manifest.BulkVersionEdit
      28             : type deletedFileEntry = manifest.DeletedFileEntry
      29             : type fileMetadata = manifest.FileMetadata
      30             : type physicalMeta = manifest.PhysicalFileMeta
      31             : type virtualMeta = manifest.VirtualFileMeta
      32             : type fileBacking = manifest.FileBacking
      33             : type newFileEntry = manifest.NewFileEntry
      34             : type version = manifest.Version
      35             : type versionEdit = manifest.VersionEdit
      36             : type versionList = manifest.VersionList
      37             : 
      38             : // versionSet manages a collection of immutable versions, and manages the
      39             : // creation of a new version from the most recent version. A new version is
      40             : // created from an existing version by applying a version edit which is just
      41             : // like it sounds: a delta from the previous version. Version edits are logged
      42             : // to the MANIFEST file, which is replayed at startup.
      43             : type versionSet struct {
      44             :         // Next seqNum to use for WAL writes.
      45             :         logSeqNum atomic.Uint64
      46             : 
      47             :         // The upper bound on sequence numbers that have been assigned so far. A
      48             :         // suffix of these sequence numbers may not have been written to a WAL. Both
      49             :         // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
      50             :         // visibleSeqNum is <= logSeqNum.
      51             :         visibleSeqNum atomic.Uint64
      52             : 
      53             :         // Number of bytes present in sstables being written by in-progress
      54             :         // compactions. This value will be zero if there are no in-progress
      55             :         // compactions. Updated and read atomically.
      56             :         atomicInProgressBytes atomic.Int64
      57             : 
      58             :         // Immutable fields.
      59             :         dirname string
      60             :         // Set to DB.mu.
      61             :         mu      *sync.Mutex
      62             :         opts    *Options
      63             :         fs      vfs.FS
      64             :         cmp     Compare
      65             :         cmpName string
      66             :         // Dynamic base level allows the dynamic base level computation to be
      67             :         // disabled. Used by tests which want to create specific LSM structures.
      68             :         dynamicBaseLevel bool
      69             : 
      70             :         // Mutable fields.
      71             :         versions versionList
      72             :         picker   compactionPicker
      73             : 
      74             :         // Not all metrics are kept here. See DB.Metrics().
      75             :         metrics Metrics
      76             : 
      77             :         // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
      78             :         // on the creation of every version.
      79             :         obsoleteFn        func(obsolete []*fileBacking)
      80             :         obsoleteTables    []fileInfo
      81             :         obsoleteManifests []fileInfo
      82             :         obsoleteOptions   []fileInfo
      83             : 
      84             :         // Zombie tables which have been removed from the current version but are
      85             :         // still referenced by an inuse iterator.
      86             :         zombieTables map[base.DiskFileNum]uint64 // filenum -> size
      87             : 
      88             :         // backingState is protected by the versionSet.logLock. It's populated
      89             :         // during Open in versionSet.load, but it's not used concurrently during
      90             :         // load.
      91             :         backingState struct {
      92             :                 // fileBackingMap is a map for the FileBacking which is supporting virtual
      93             :                 // sstables in the latest version. Once the file backing is backing no
      94             :                 // virtual sstables in the latest version, it is removed from this map and
      95             :                 // the corresponding state is added to the zombieTables map. Note that we
      96             :                 // don't keep track of file backing which supports a virtual sstable
      97             :                 // which is not in the latest version.
      98             :                 fileBackingMap map[base.DiskFileNum]*fileBacking
      99             :                 // fileBackingSize is the sum of the sizes of the fileBackings in the
     100             :                 // fileBackingMap.
     101             :                 fileBackingSize uint64
     102             :         }
     103             : 
     104             :         // minUnflushedLogNum is the smallest WAL log file number corresponding to
     105             :         // mutations that have not been flushed to an sstable.
     106             :         minUnflushedLogNum base.DiskFileNum
     107             : 
     108             :         // The next file number. A single counter is used to assign file
     109             :         // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
     110             :         nextFileNum uint64
     111             : 
     112             :         // The current manifest file number.
     113             :         manifestFileNum base.DiskFileNum
     114             :         manifestMarker  *atomicfs.Marker
     115             : 
     116             :         manifestFile          vfs.File
     117             :         manifest              *record.Writer
     118             :         getFormatMajorVersion func() FormatMajorVersion
     119             : 
     120             :         writing    bool
     121             :         writerCond sync.Cond
     122             :         // State for deciding when to write a snapshot. Protected by mu.
     123             :         rotationHelper record.RotationHelper
     124             : }
     125             : 
     126             : func (vs *versionSet) init(
     127             :         dirname string,
     128             :         opts *Options,
     129             :         marker *atomicfs.Marker,
     130             :         getFMV func() FormatMajorVersion,
     131             :         mu *sync.Mutex,
     132           2 : ) {
     133           2 :         vs.dirname = dirname
     134           2 :         vs.mu = mu
     135           2 :         vs.writerCond.L = mu
     136           2 :         vs.opts = opts
     137           2 :         vs.fs = opts.FS
     138           2 :         vs.cmp = opts.Comparer.Compare
     139           2 :         vs.cmpName = opts.Comparer.Name
     140           2 :         vs.dynamicBaseLevel = true
     141           2 :         vs.versions.Init(mu)
     142           2 :         vs.obsoleteFn = vs.addObsoleteLocked
     143           2 :         vs.zombieTables = make(map[base.DiskFileNum]uint64)
     144           2 :         vs.backingState.fileBackingMap = make(map[base.DiskFileNum]*fileBacking)
     145           2 :         vs.backingState.fileBackingSize = 0
     146           2 :         vs.nextFileNum = 1
     147           2 :         vs.manifestMarker = marker
     148           2 :         vs.getFormatMajorVersion = getFMV
     149           2 : }
     150             : 
     151             : // create creates a version set for a fresh DB.
     152             : func (vs *versionSet) create(
     153             :         jobID int,
     154             :         dirname string,
     155             :         opts *Options,
     156             :         marker *atomicfs.Marker,
     157             :         getFormatMajorVersion func() FormatMajorVersion,
     158             :         mu *sync.Mutex,
     159           2 : ) error {
     160           2 :         vs.init(dirname, opts, marker, getFormatMajorVersion, mu)
     161           2 :         newVersion := &version{}
     162           2 :         vs.append(newVersion)
     163           2 :         var err error
     164           2 : 
     165           2 :         vs.picker = newCompactionPicker(newVersion, vs.opts, nil)
     166           2 :         // Note that a "snapshot" version edit is written to the manifest when it is
     167           2 :         // created.
     168           2 :         vs.manifestFileNum = vs.getNextDiskFileNum()
     169           2 :         err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum)
     170           2 :         if err == nil {
     171           2 :                 if err = vs.manifest.Flush(); err != nil {
     172           1 :                         vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
     173           1 :                 }
     174             :         }
     175           2 :         if err == nil {
     176           2 :                 if err = vs.manifestFile.Sync(); err != nil {
     177           1 :                         vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
     178           1 :                 }
     179             :         }
     180           2 :         if err == nil {
     181           2 :                 // NB: Move() is responsible for syncing the data directory.
     182           2 :                 if err = vs.manifestMarker.Move(base.MakeFilename(fileTypeManifest, vs.manifestFileNum)); err != nil {
     183           1 :                         vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
     184           1 :                 }
     185             :         }
     186             : 
     187           2 :         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     188           2 :                 JobID:   jobID,
     189           2 :                 Path:    base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, vs.manifestFileNum),
     190           2 :                 FileNum: vs.manifestFileNum,
     191           2 :                 Err:     err,
     192           2 :         })
     193           2 :         if err != nil {
     194           1 :                 return err
     195           1 :         }
     196           2 :         return nil
     197             : }
     198             : 
     199             : // load loads the version set from the manifest file.
     200             : func (vs *versionSet) load(
     201             :         dirname string,
     202             :         opts *Options,
     203             :         manifestFileNum base.DiskFileNum,
     204             :         marker *atomicfs.Marker,
     205             :         getFormatMajorVersion func() FormatMajorVersion,
     206             :         mu *sync.Mutex,
     207           2 : ) error {
     208           2 :         vs.init(dirname, opts, marker, getFormatMajorVersion, mu)
     209           2 : 
     210           2 :         vs.manifestFileNum = manifestFileNum
     211           2 :         manifestPath := base.MakeFilepath(opts.FS, dirname, fileTypeManifest, vs.manifestFileNum)
     212           2 :         manifestFilename := opts.FS.PathBase(manifestPath)
     213           2 : 
     214           2 :         // Read the versionEdits in the manifest file.
     215           2 :         var bve bulkVersionEdit
     216           2 :         bve.AddedByFileNum = make(map[base.FileNum]*fileMetadata)
     217           2 :         manifest, err := vs.fs.Open(manifestPath)
     218           2 :         if err != nil {
     219           0 :                 return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
     220           0 :                         errors.Safe(manifestFilename), dirname)
     221           0 :         }
     222           2 :         defer manifest.Close()
     223           2 :         rr := record.NewReader(manifest, 0 /* logNum */)
     224           2 :         for {
     225           2 :                 r, err := rr.Next()
     226           2 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     227           2 :                         break
     228             :                 }
     229           2 :                 if err != nil {
     230           0 :                         return errors.Wrapf(err, "pebble: error when loading manifest file %q",
     231           0 :                                 errors.Safe(manifestFilename))
     232           0 :                 }
     233           2 :                 var ve versionEdit
     234           2 :                 err = ve.Decode(r)
     235           2 :                 if err != nil {
     236           0 :                         // Break instead of returning an error if the record is corrupted
     237           0 :                         // or invalid.
     238           0 :                         if err == io.EOF || record.IsInvalidRecord(err) {
     239           0 :                                 break
     240             :                         }
     241           0 :                         return err
     242             :                 }
     243           2 :                 if ve.ComparerName != "" {
     244           2 :                         if ve.ComparerName != vs.cmpName {
     245           1 :                                 return errors.Errorf("pebble: manifest file %q for DB %q: "+
     246           1 :                                         "comparer name from file %q != comparer name from Options %q",
     247           1 :                                         errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmpName))
     248           1 :                         }
     249             :                 }
     250           2 :                 if err := bve.Accumulate(&ve); err != nil {
     251           0 :                         return err
     252           0 :                 }
     253           2 :                 if ve.MinUnflushedLogNum != 0 {
     254           2 :                         vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     255           2 :                 }
     256           2 :                 if ve.NextFileNum != 0 {
     257           2 :                         vs.nextFileNum = ve.NextFileNum
     258           2 :                 }
     259           2 :                 if ve.LastSeqNum != 0 {
     260           2 :                         // logSeqNum is the _next_ sequence number that will be assigned,
     261           2 :                         // while LastSeqNum is the last assigned sequence number. Note that
     262           2 :                         // this behaviour mimics that in RocksDB; the first sequence number
     263           2 :                         // assigned is one greater than the one present in the manifest
     264           2 :                         // (assuming no WALs contain higher sequence numbers than the
     265           2 :                         // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
     266           2 :                         // next sequence number that will be assigned.
     267           2 :                         //
     268           2 :                         // If LastSeqNum is less than SeqNumStart, increase it to at least
     269           2 :                         // SeqNumStart to leave ample room for reserved sequence numbers.
     270           2 :                         if ve.LastSeqNum+1 < base.SeqNumStart {
     271           0 :                                 vs.logSeqNum.Store(base.SeqNumStart)
     272           2 :                         } else {
     273           2 :                                 vs.logSeqNum.Store(ve.LastSeqNum + 1)
     274           2 :                         }
     275             :                 }
     276             :         }
     277             :         // We have already set vs.nextFileNum = 2 at the beginning of the
     278             :         // function and could have only updated it to some other non-zero value,
     279             :         // so it cannot be 0 here.
     280           2 :         if vs.minUnflushedLogNum == 0 {
     281           0 :                 if vs.nextFileNum >= 2 {
     282           0 :                         // We either have a freshly created DB, or a DB created by RocksDB
     283           0 :                         // that has not had a single flushed SSTable yet. This is because
     284           0 :                         // RocksDB bumps up nextFileNum in this case without bumping up
     285           0 :                         // minUnflushedLogNum, even if WALs with non-zero file numbers are
     286           0 :                         // present in the directory.
     287           0 :                 } else {
     288           0 :                         return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
     289           0 :                                 errors.Safe(manifestFilename), dirname)
     290           0 :                 }
     291             :         }
     292           2 :         vs.markFileNumUsed(vs.minUnflushedLogNum)
     293           2 : 
     294           2 :         // Populate the fileBackingMap and the FileBacking for virtual sstables since
     295           2 :         // we have finished version edit accumulation.
     296           2 :         for _, s := range bve.AddedFileBacking {
     297           2 :                 vs.addFileBacking(s)
     298           2 :         }
     299             : 
     300           2 :         for _, fileNum := range bve.RemovedFileBacking {
     301           2 :                 vs.removeFileBacking(fileNum)
     302           2 :         }
     303             : 
     304           2 :         newVersion, err := bve.Apply(
     305           2 :                 nil, vs.cmp, opts.Comparer.FormatKey, opts.FlushSplitBytes,
     306           2 :                 opts.Experimental.ReadCompactionRate, nil, /* zombies */
     307           2 :         )
     308           2 :         if err != nil {
     309           0 :                 return err
     310           0 :         }
     311           2 :         newVersion.L0Sublevels.InitCompactingFileInfo(nil /* in-progress compactions */)
     312           2 :         vs.append(newVersion)
     313           2 : 
     314           2 :         for i := range vs.metrics.Levels {
     315           2 :                 l := &vs.metrics.Levels[i]
     316           2 :                 l.NumFiles = int64(newVersion.Levels[i].Len())
     317           2 :                 files := newVersion.Levels[i].Slice()
     318           2 :                 l.Size = int64(files.SizeSum())
     319           2 :         }
     320             : 
     321           2 :         vs.picker = newCompactionPicker(newVersion, vs.opts, nil)
     322           2 :         return nil
     323             : }
     324             : 
     325           2 : func (vs *versionSet) close() error {
     326           2 :         if vs.manifestFile != nil {
     327           2 :                 if err := vs.manifestFile.Close(); err != nil {
     328           0 :                         return err
     329           0 :                 }
     330             :         }
     331           2 :         if vs.manifestMarker != nil {
     332           2 :                 if err := vs.manifestMarker.Close(); err != nil {
     333           0 :                         return err
     334           0 :                 }
     335             :         }
     336           2 :         return nil
     337             : }
     338             : 
     339             : // logLock locks the manifest for writing. The lock must be released by either
     340             : // a call to logUnlock or logAndApply.
     341             : //
     342             : // DB.mu must be held when calling this method, but the mutex may be dropped and
     343             : // re-acquired during the course of this method.
     344           2 : func (vs *versionSet) logLock() {
     345           2 :         // Wait for any existing writing to the manifest to complete, then mark the
     346           2 :         // manifest as busy.
     347           2 :         for vs.writing {
     348           2 :                 vs.writerCond.Wait()
     349           2 :         }
     350           2 :         vs.writing = true
     351             : }
     352             : 
     353             : // logUnlock releases the lock for manifest writing.
     354             : //
     355             : // DB.mu must be held when calling this method.
     356           2 : func (vs *versionSet) logUnlock() {
     357           2 :         if !vs.writing {
     358           0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     359           0 :         }
     360           2 :         vs.writing = false
     361           2 :         vs.writerCond.Signal()
     362             : }
     363             : 
     364             : // Only call if the DiskFileNum doesn't exist in the fileBackingMap.
     365           2 : func (vs *versionSet) addFileBacking(backing *manifest.FileBacking) {
     366           2 :         _, ok := vs.backingState.fileBackingMap[backing.DiskFileNum]
     367           2 :         if ok {
     368           0 :                 panic("pebble: trying to add an existing file backing")
     369             :         }
     370           2 :         vs.backingState.fileBackingMap[backing.DiskFileNum] = backing
     371           2 :         vs.backingState.fileBackingSize += backing.Size
     372             : }
     373             : 
     374             : // Only call if the the DiskFileNum exists in the fileBackingMap.
     375           2 : func (vs *versionSet) removeFileBacking(dfn base.DiskFileNum) {
     376           2 :         backing, ok := vs.backingState.fileBackingMap[dfn]
     377           2 :         if !ok {
     378           0 :                 panic("pebble: trying to remove an unknown file backing")
     379             :         }
     380           2 :         delete(vs.backingState.fileBackingMap, dfn)
     381           2 :         vs.backingState.fileBackingSize -= backing.Size
     382             : }
     383             : 
     384             : // logAndApply logs the version edit to the manifest, applies the version edit
     385             : // to the current version, and installs the new version.
     386             : //
     387             : // DB.mu must be held when calling this method and will be released temporarily
     388             : // while performing file I/O. Requires that the manifest is locked for writing
     389             : // (see logLock). Will unconditionally release the manifest lock (via
     390             : // logUnlock) even if an error occurs.
     391             : //
     392             : // inProgressCompactions is called while DB.mu is held, to get the list of
     393             : // in-progress compactions.
     394             : func (vs *versionSet) logAndApply(
     395             :         jobID int,
     396             :         ve *versionEdit,
     397             :         metrics map[int]*LevelMetrics,
     398             :         forceRotation bool,
     399             :         inProgressCompactions func() []compactionInfo,
     400           2 : ) error {
     401           2 :         if !vs.writing {
     402           0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     403           0 :         }
     404           2 :         defer vs.logUnlock()
     405           2 : 
     406           2 :         if ve.MinUnflushedLogNum != 0 {
     407           2 :                 if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
     408           2 :                         vs.nextFileNum <= uint64(ve.MinUnflushedLogNum) {
     409           0 :                         panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
     410           0 :                                 ve.MinUnflushedLogNum))
     411             :                 }
     412             :         }
     413             : 
     414             :         // This is the next manifest filenum, but if the current file is too big we
     415             :         // will write this ve to the next file which means what ve encodes is the
     416             :         // current filenum and not the next one.
     417             :         //
     418             :         // TODO(sbhola): figure out why this is correct and update comment.
     419           2 :         ve.NextFileNum = vs.nextFileNum
     420           2 : 
     421           2 :         // LastSeqNum is set to the current upper bound on the assigned sequence
     422           2 :         // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
     423           2 :         // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
     424           2 :         // replay. It must be higher than or equal to any than any sequence number
     425           2 :         // written to an sstable, including sequence numbers in ingested files.
     426           2 :         // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
     427           2 :         // number. This is fallout from ingestion which allows a sequence number X to
     428           2 :         // be assigned to an ingested sstable even though sequence number X-1 resides
     429           2 :         // in an unflushed memtable. logSeqNum is the _next_ sequence number that
     430           2 :         // will be assigned, so subtract that by 1 to get the upper bound on the
     431           2 :         // last assigned sequence number.
     432           2 :         logSeqNum := vs.logSeqNum.Load()
     433           2 :         ve.LastSeqNum = logSeqNum - 1
     434           2 :         if logSeqNum == 0 {
     435           0 :                 // logSeqNum is initialized to 1 in Open() if there are no previous WAL
     436           0 :                 // or manifest records, so this case should never happen.
     437           0 :                 vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
     438           0 :         }
     439             : 
     440           2 :         currentVersion := vs.currentVersion()
     441           2 :         var newVersion *version
     442           2 : 
     443           2 :         // Generate a new manifest if we don't currently have one, or forceRotation
     444           2 :         // is true, or the current one is too large.
     445           2 :         //
     446           2 :         // For largeness, we do not exclusively use MaxManifestFileSize size
     447           2 :         // threshold since we have had incidents where due to either large keys or
     448           2 :         // large numbers of files, each edit results in a snapshot + write of the
     449           2 :         // edit. This slows the system down since each flush or compaction is
     450           2 :         // writing a new manifest snapshot. The primary goal of the size-based
     451           2 :         // rollover logic is to ensure that when reopening a DB, the number of edits
     452           2 :         // that need to be replayed on top of the snapshot is "sane". Rolling over
     453           2 :         // to a new manifest after each edit is not relevant to that goal.
     454           2 :         //
     455           2 :         // Consider the following cases:
     456           2 :         // - The number of live files F in the DB is roughly stable: after writing
     457           2 :         //   the snapshot (with F files), say we require that there be enough edits
     458           2 :         //   such that the cumulative number of files in those edits, E, be greater
     459           2 :         //   than F. This will ensure that the total amount of time in logAndApply
     460           2 :         //   that is spent in snapshot writing is ~50%.
     461           2 :         //
     462           2 :         // - The number of live files F in the DB is shrinking drastically, say from
     463           2 :         //   F to F/10: This can happen for various reasons, like wide range
     464           2 :         //   tombstones, or large numbers of smaller than usual files that are being
     465           2 :         //   merged together into larger files. And say the new files generated
     466           2 :         //   during this shrinkage is insignificant compared to F/10, and so for
     467           2 :         //   this example we will assume it is effectively 0. After this shrinking,
     468           2 :         //   E = 0.9F, and so if we used the previous snapshot file count, F, as the
     469           2 :         //   threshold that needs to be exceeded, we will further delay the snapshot
     470           2 :         //   writing. Which means on DB reopen we will need to replay 0.9F edits to
     471           2 :         //   get to a version with 0.1F files. It would be better to create a new
     472           2 :         //   snapshot when E exceeds the number of files in the current version.
     473           2 :         //
     474           2 :         // - The number of live files F in the DB is growing via perfect ingests
     475           2 :         //   into L6: Say we wrote the snapshot when there were F files and now we
     476           2 :         //   have 10F files, so E = 9F. We will further delay writing a new
     477           2 :         //   snapshot. This case can be critiqued as contrived, but we consider it
     478           2 :         //   nonetheless.
     479           2 :         //
     480           2 :         // The logic below uses the min of the last snapshot file count and the file
     481           2 :         // count in the current version.
     482           2 :         vs.rotationHelper.AddRecord(int64(len(ve.DeletedFiles) + len(ve.NewFiles)))
     483           2 :         sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
     484           2 :         requireRotation := forceRotation || vs.manifest == nil
     485           2 : 
     486           2 :         var nextSnapshotFilecount int64
     487           2 :         for i := range vs.metrics.Levels {
     488           2 :                 nextSnapshotFilecount += vs.metrics.Levels[i].NumFiles
     489           2 :         }
     490           2 :         if sizeExceeded && !requireRotation {
     491           2 :                 requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
     492           2 :         }
     493           2 :         var newManifestFileNum base.DiskFileNum
     494           2 :         var prevManifestFileSize uint64
     495           2 :         if requireRotation {
     496           2 :                 newManifestFileNum = vs.getNextDiskFileNum()
     497           2 :                 prevManifestFileSize = uint64(vs.manifest.Size())
     498           2 :         }
     499             : 
     500             :         // Grab certain values before releasing vs.mu, in case createManifest() needs
     501             :         // to be called.
     502           2 :         minUnflushedLogNum := vs.minUnflushedLogNum
     503           2 :         nextFileNum := vs.nextFileNum
     504           2 : 
     505           2 :         var zombies map[base.DiskFileNum]uint64
     506           2 :         if err := func() error {
     507           2 :                 vs.mu.Unlock()
     508           2 :                 defer vs.mu.Lock()
     509           2 : 
     510           2 :                 var err error
     511           2 :                 if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
     512           0 :                         return errors.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
     513           0 :                 }
     514           2 :                 newVersion, zombies, err = manifest.AccumulateIncompleteAndApplySingleVE(
     515           2 :                         ve, currentVersion, vs.cmp, vs.opts.Comparer.FormatKey,
     516           2 :                         vs.opts.FlushSplitBytes, vs.opts.Experimental.ReadCompactionRate,
     517           2 :                         vs.backingState.fileBackingMap, vs.addFileBacking, vs.removeFileBacking,
     518           2 :                 )
     519           2 :                 if err != nil {
     520           0 :                         return errors.Wrap(err, "MANIFEST apply failed")
     521           0 :                 }
     522             : 
     523           2 :                 if newManifestFileNum != 0 {
     524           2 :                         if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum); err != nil {
     525           1 :                                 vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     526           1 :                                         JobID:   jobID,
     527           1 :                                         Path:    base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
     528           1 :                                         FileNum: newManifestFileNum,
     529           1 :                                         Err:     err,
     530           1 :                                 })
     531           1 :                                 return errors.Wrap(err, "MANIFEST create failed")
     532           1 :                         }
     533             :                 }
     534             : 
     535           2 :                 w, err := vs.manifest.Next()
     536           2 :                 if err != nil {
     537           0 :                         return errors.Wrap(err, "MANIFEST next record write failed")
     538           0 :                 }
     539             : 
     540             :                 // NB: Any error from this point on is considered fatal as we don't know if
     541             :                 // the MANIFEST write occurred or not. Trying to determine that is
     542             :                 // fraught. Instead we rely on the standard recovery mechanism run when a
     543             :                 // database is open. In particular, that mechanism generates a new MANIFEST
     544             :                 // and ensures it is synced.
     545           2 :                 if err := ve.Encode(w); err != nil {
     546           0 :                         return errors.Wrap(err, "MANIFEST write failed")
     547           0 :                 }
     548           2 :                 if err := vs.manifest.Flush(); err != nil {
     549           1 :                         return errors.Wrap(err, "MANIFEST flush failed")
     550           1 :                 }
     551           2 :                 if err := vs.manifestFile.Sync(); err != nil {
     552           1 :                         return errors.Wrap(err, "MANIFEST sync failed")
     553           1 :                 }
     554           2 :                 if newManifestFileNum != 0 {
     555           2 :                         // NB: Move() is responsible for syncing the data directory.
     556           2 :                         if err := vs.manifestMarker.Move(base.MakeFilename(fileTypeManifest, newManifestFileNum)); err != nil {
     557           0 :                                 return errors.Wrap(err, "MANIFEST set current failed")
     558           0 :                         }
     559           2 :                         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     560           2 :                                 JobID:   jobID,
     561           2 :                                 Path:    base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
     562           2 :                                 FileNum: newManifestFileNum,
     563           2 :                         })
     564             :                 }
     565           2 :                 return nil
     566           1 :         }(); err != nil {
     567           1 :                 // Any error encountered during any of the operations in the previous
     568           1 :                 // closure are considered fatal. Treating such errors as fatal is preferred
     569           1 :                 // to attempting to unwind various file and b-tree reference counts, and
     570           1 :                 // re-generating L0 sublevel metadata. This may change in the future, if
     571           1 :                 // certain manifest / WAL operations become retryable. For more context, see
     572           1 :                 // #1159 and #1792.
     573           1 :                 vs.opts.Logger.Fatalf("%s", err)
     574           1 :                 return err
     575           1 :         }
     576             : 
     577           2 :         if requireRotation {
     578           2 :                 // Successfully rotated.
     579           2 :                 vs.rotationHelper.Rotate(nextSnapshotFilecount)
     580           2 :         }
     581             :         // Now that DB.mu is held again, initialize compacting file info in
     582             :         // L0Sublevels.
     583           2 :         inProgress := inProgressCompactions()
     584           2 : 
     585           2 :         newVersion.L0Sublevels.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
     586           2 : 
     587           2 :         // Update the zombie tables set first, as installation of the new version
     588           2 :         // will unref the previous version which could result in addObsoleteLocked
     589           2 :         // being called.
     590           2 :         for fileNum, size := range zombies {
     591           2 :                 vs.zombieTables[fileNum] = size
     592           2 :         }
     593             : 
     594             :         // Install the new version.
     595           2 :         vs.append(newVersion)
     596           2 :         if ve.MinUnflushedLogNum != 0 {
     597           2 :                 vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     598           2 :         }
     599           2 :         if newManifestFileNum != 0 {
     600           2 :                 if vs.manifestFileNum != 0 {
     601           2 :                         vs.obsoleteManifests = append(vs.obsoleteManifests, fileInfo{
     602           2 :                                 FileNum:  vs.manifestFileNum,
     603           2 :                                 FileSize: prevManifestFileSize,
     604           2 :                         })
     605           2 :                 }
     606           2 :                 vs.manifestFileNum = newManifestFileNum
     607             :         }
     608             : 
     609           2 :         for level, update := range metrics {
     610           2 :                 vs.metrics.Levels[level].Add(update)
     611           2 :         }
     612           2 :         for i := range vs.metrics.Levels {
     613           2 :                 l := &vs.metrics.Levels[i]
     614           2 :                 l.NumFiles = int64(newVersion.Levels[i].Len())
     615           2 :                 l.NumVirtualFiles = newVersion.Levels[i].NumVirtual
     616           2 :                 l.VirtualSize = newVersion.Levels[i].VirtualSize
     617           2 :                 l.Size = int64(newVersion.Levels[i].Size())
     618           2 : 
     619           2 :                 l.Sublevels = 0
     620           2 :                 if l.NumFiles > 0 {
     621           2 :                         l.Sublevels = 1
     622           2 :                 }
     623           2 :                 if invariants.Enabled {
     624           2 :                         levelFiles := newVersion.Levels[i].Slice()
     625           2 :                         if size := int64(levelFiles.SizeSum()); l.Size != size {
     626           0 :                                 vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Size, size)
     627           0 :                         }
     628           2 :                         if nVirtual := levelFiles.NumVirtual(); nVirtual != l.NumVirtualFiles {
     629           0 :                                 vs.opts.Logger.Fatalf(
     630           0 :                                         "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
     631           0 :                                         i, l.NumVirtualFiles, nVirtual,
     632           0 :                                 )
     633           0 :                         }
     634           2 :                         if vSize := levelFiles.VirtualSizeSum(); vSize != l.VirtualSize {
     635           0 :                                 vs.opts.Logger.Fatalf(
     636           0 :                                         "versionSet metrics L%d Virtual size = %d, actual size = %d",
     637           0 :                                         i, l.VirtualSize, vSize,
     638           0 :                                 )
     639           0 :                         }
     640             :                 }
     641             :         }
     642           2 :         vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
     643           2 : 
     644           2 :         vs.picker = newCompactionPicker(newVersion, vs.opts, inProgress)
     645           2 :         if !vs.dynamicBaseLevel {
     646           1 :                 vs.picker.forceBaseLevel1()
     647           1 :         }
     648           2 :         return nil
     649             : }
     650             : 
     651             : func (vs *versionSet) incrementCompactions(
     652             :         kind compactionKind, extraLevels []*compactionLevel, pickerMetrics compactionPickerMetrics,
     653           2 : ) {
     654           2 :         switch kind {
     655           2 :         case compactionKindDefault:
     656           2 :                 vs.metrics.Compact.Count++
     657           2 :                 vs.metrics.Compact.DefaultCount++
     658             : 
     659           2 :         case compactionKindFlush, compactionKindIngestedFlushable:
     660           2 :                 vs.metrics.Flush.Count++
     661             : 
     662           2 :         case compactionKindMove:
     663           2 :                 vs.metrics.Compact.Count++
     664           2 :                 vs.metrics.Compact.MoveCount++
     665             : 
     666           2 :         case compactionKindDeleteOnly:
     667           2 :                 vs.metrics.Compact.Count++
     668           2 :                 vs.metrics.Compact.DeleteOnlyCount++
     669             : 
     670           2 :         case compactionKindElisionOnly:
     671           2 :                 vs.metrics.Compact.Count++
     672           2 :                 vs.metrics.Compact.ElisionOnlyCount++
     673             : 
     674           1 :         case compactionKindRead:
     675           1 :                 vs.metrics.Compact.Count++
     676           1 :                 vs.metrics.Compact.ReadCount++
     677             : 
     678           1 :         case compactionKindRewrite:
     679           1 :                 vs.metrics.Compact.Count++
     680           1 :                 vs.metrics.Compact.RewriteCount++
     681             :         }
     682           2 :         if len(extraLevels) > 0 {
     683           2 :                 vs.metrics.Compact.MultiLevelCount++
     684           2 :         }
     685             : }
     686             : 
     687           2 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
     688           2 :         vs.atomicInProgressBytes.Add(numBytes)
     689           2 : }
     690             : 
     691             : // createManifest creates a manifest file that contains a snapshot of vs.
     692             : func (vs *versionSet) createManifest(
     693             :         dirname string, fileNum, minUnflushedLogNum base.DiskFileNum, nextFileNum uint64,
     694           2 : ) (err error) {
     695           2 :         var (
     696           2 :                 filename     = base.MakeFilepath(vs.fs, dirname, fileTypeManifest, fileNum)
     697           2 :                 manifestFile vfs.File
     698           2 :                 manifest     *record.Writer
     699           2 :         )
     700           2 :         defer func() {
     701           2 :                 if manifest != nil {
     702           0 :                         manifest.Close()
     703           0 :                 }
     704           2 :                 if manifestFile != nil {
     705           0 :                         manifestFile.Close()
     706           0 :                 }
     707           2 :                 if err != nil {
     708           1 :                         vs.fs.Remove(filename)
     709           1 :                 }
     710             :         }()
     711           2 :         manifestFile, err = vs.fs.Create(filename)
     712           2 :         if err != nil {
     713           1 :                 return err
     714           1 :         }
     715           2 :         manifest = record.NewWriter(manifestFile)
     716           2 : 
     717           2 :         snapshot := versionEdit{
     718           2 :                 ComparerName: vs.cmpName,
     719           2 :         }
     720           2 :         dedup := make(map[base.DiskFileNum]struct{})
     721           2 :         for level, levelMetadata := range vs.currentVersion().Levels {
     722           2 :                 iter := levelMetadata.Iter()
     723           2 :                 for meta := iter.First(); meta != nil; meta = iter.Next() {
     724           2 :                         snapshot.NewFiles = append(snapshot.NewFiles, newFileEntry{
     725           2 :                                 Level: level,
     726           2 :                                 Meta:  meta,
     727           2 :                         })
     728           2 :                         if _, ok := dedup[meta.FileBacking.DiskFileNum]; meta.Virtual && !ok {
     729           2 :                                 dedup[meta.FileBacking.DiskFileNum] = struct{}{}
     730           2 :                                 snapshot.CreatedBackingTables = append(
     731           2 :                                         snapshot.CreatedBackingTables,
     732           2 :                                         meta.FileBacking,
     733           2 :                                 )
     734           2 :                         }
     735             :                 }
     736             :         }
     737             : 
     738             :         // When creating a version snapshot for an existing DB, this snapshot VersionEdit will be
     739             :         // immediately followed by another VersionEdit (being written in logAndApply()). That
     740             :         // VersionEdit always contains a LastSeqNum, so we don't need to include that in the snapshot.
     741             :         // But it does not necessarily include MinUnflushedLogNum, NextFileNum, so we initialize those
     742             :         // using the corresponding fields in the versionSet (which came from the latest preceding
     743             :         // VersionEdit that had those fields).
     744           2 :         snapshot.MinUnflushedLogNum = minUnflushedLogNum
     745           2 :         snapshot.NextFileNum = nextFileNum
     746           2 : 
     747           2 :         w, err1 := manifest.Next()
     748           2 :         if err1 != nil {
     749           0 :                 return err1
     750           0 :         }
     751           2 :         if err := snapshot.Encode(w); err != nil {
     752           0 :                 return err
     753           0 :         }
     754             : 
     755           2 :         if vs.manifest != nil {
     756           2 :                 vs.manifest.Close()
     757           2 :                 vs.manifest = nil
     758           2 :         }
     759           2 :         if vs.manifestFile != nil {
     760           2 :                 if err := vs.manifestFile.Close(); err != nil {
     761           0 :                         return err
     762           0 :                 }
     763           2 :                 vs.manifestFile = nil
     764             :         }
     765             : 
     766           2 :         vs.manifest, manifest = manifest, nil
     767           2 :         vs.manifestFile, manifestFile = manifestFile, nil
     768           2 :         return nil
     769             : }
     770             : 
     771           2 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
     772           2 :         if vs.nextFileNum <= uint64(fileNum) {
     773           2 :                 vs.nextFileNum = uint64(fileNum + 1)
     774           2 :         }
     775             : }
     776             : 
     777           2 : func (vs *versionSet) getNextFileNum() base.FileNum {
     778           2 :         x := vs.nextFileNum
     779           2 :         vs.nextFileNum++
     780           2 :         return base.FileNum(x)
     781           2 : }
     782             : 
     783           2 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
     784           2 :         x := vs.nextFileNum
     785           2 :         vs.nextFileNum++
     786           2 :         return base.DiskFileNum(x)
     787           2 : }
     788             : 
     789           2 : func (vs *versionSet) append(v *version) {
     790           2 :         if v.Refs() != 0 {
     791           0 :                 panic("pebble: version should be unreferenced")
     792             :         }
     793           2 :         if !vs.versions.Empty() {
     794           2 :                 vs.versions.Back().UnrefLocked()
     795           2 :         }
     796           2 :         v.Deleted = vs.obsoleteFn
     797           2 :         v.Ref()
     798           2 :         vs.versions.PushBack(v)
     799             : }
     800             : 
     801           2 : func (vs *versionSet) currentVersion() *version {
     802           2 :         return vs.versions.Back()
     803           2 : }
     804             : 
     805           2 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
     806           2 :         current := vs.currentVersion()
     807           2 :         for v := vs.versions.Front(); true; v = v.Next() {
     808           2 :                 for _, lm := range v.Levels {
     809           2 :                         iter := lm.Iter()
     810           2 :                         for f := iter.First(); f != nil; f = iter.Next() {
     811           2 :                                 m[f.FileBacking.DiskFileNum] = struct{}{}
     812           2 :                         }
     813             :                 }
     814           2 :                 if v == current {
     815           2 :                         break
     816             :                 }
     817             :         }
     818             : }
     819             : 
     820             : // addObsoleteLocked will add the fileInfo associated with obsolete backing
     821             : // sstables to the obsolete tables list.
     822             : //
     823             : // The file backings in the obsolete list must not appear more than once.
     824             : //
     825             : // DB.mu must be held when addObsoleteLocked is called.
     826           2 : func (vs *versionSet) addObsoleteLocked(obsolete []*fileBacking) {
     827           2 :         if len(obsolete) == 0 {
     828           2 :                 return
     829           2 :         }
     830             : 
     831           2 :         obsoleteFileInfo := make([]fileInfo, len(obsolete))
     832           2 :         for i, bs := range obsolete {
     833           2 :                 obsoleteFileInfo[i].FileNum = bs.DiskFileNum
     834           2 :                 obsoleteFileInfo[i].FileSize = bs.Size
     835           2 :         }
     836             : 
     837           2 :         if invariants.Enabled {
     838           2 :                 dedup := make(map[base.DiskFileNum]struct{})
     839           2 :                 for _, fi := range obsoleteFileInfo {
     840           2 :                         dedup[fi.FileNum] = struct{}{}
     841           2 :                 }
     842           2 :                 if len(dedup) != len(obsoleteFileInfo) {
     843           0 :                         panic("pebble: duplicate FileBacking present in obsolete list")
     844             :                 }
     845             :         }
     846             : 
     847           2 :         for _, fi := range obsoleteFileInfo {
     848           2 :                 // Note that the obsolete tables are no longer zombie by the definition of
     849           2 :                 // zombie, but we leave them in the zombie tables map until they are
     850           2 :                 // deleted from disk.
     851           2 :                 if _, ok := vs.zombieTables[fi.FileNum]; !ok {
     852           0 :                         vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fi.FileNum)
     853           0 :                 }
     854             :         }
     855             : 
     856           2 :         vs.obsoleteTables = append(vs.obsoleteTables, obsoleteFileInfo...)
     857           2 :         vs.updateObsoleteTableMetricsLocked()
     858             : }
     859             : 
     860             : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
     861             : // called.
     862           2 : func (vs *versionSet) addObsolete(obsolete []*fileBacking) {
     863           2 :         vs.mu.Lock()
     864           2 :         defer vs.mu.Unlock()
     865           2 :         vs.addObsoleteLocked(obsolete)
     866           2 : }
     867             : 
     868           2 : func (vs *versionSet) updateObsoleteTableMetricsLocked() {
     869           2 :         vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
     870           2 :         vs.metrics.Table.ObsoleteSize = 0
     871           2 :         for _, fi := range vs.obsoleteTables {
     872           2 :                 vs.metrics.Table.ObsoleteSize += fi.FileSize
     873           2 :         }
     874             : }
     875             : 
     876             : func findCurrentManifest(
     877             :         fs vfs.FS, dirname string,
     878           2 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
     879           2 :         // Locating a marker should succeed even if the marker has never been placed.
     880           2 :         var filename string
     881           2 :         marker, filename, err = atomicfs.LocateMarker(fs, dirname, manifestMarkerName)
     882           2 :         if err != nil {
     883           1 :                 return nil, 0, false, err
     884           1 :         }
     885             : 
     886           2 :         if filename == "" {
     887           2 :                 // The marker hasn't been set yet. This database doesn't exist.
     888           2 :                 return marker, 0, false, nil
     889           2 :         }
     890             : 
     891           2 :         var ok bool
     892           2 :         _, manifestNum, ok = base.ParseFilename(fs, filename)
     893           2 :         if !ok {
     894           0 :                 return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
     895           0 :         }
     896           2 :         return marker, manifestNum, true, nil
     897             : }
     898             : 
     899           2 : func newFileMetrics(newFiles []manifest.NewFileEntry) map[int]*LevelMetrics {
     900           2 :         m := map[int]*LevelMetrics{}
     901           2 :         for _, nf := range newFiles {
     902           2 :                 lm := m[nf.Level]
     903           2 :                 if lm == nil {
     904           2 :                         lm = &LevelMetrics{}
     905           2 :                         m[nf.Level] = lm
     906           2 :                 }
     907           2 :                 lm.NumFiles++
     908           2 :                 lm.Size += int64(nf.Meta.Size)
     909             :         }
     910           2 :         return m
     911             : }

Generated by: LCOV version 1.14