LCOV - code coverage report
Current view: top level - pebble - version_set.go (source / functions) Hit Total Coverage
Test: 2023-11-18 08:15Z 717d49c2 - meta test only.lcov Lines: 498 668 74.6 %
Date: 2023-11-18 08:16:19 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "fmt"
      10             :         "io"
      11             :         "sync"
      12             :         "sync/atomic"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/errors/oserror"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/invariants"
      18             :         "github.com/cockroachdb/pebble/internal/manifest"
      19             :         "github.com/cockroachdb/pebble/record"
      20             :         "github.com/cockroachdb/pebble/vfs"
      21             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      22             : )
      23             : 
      24             : const numLevels = manifest.NumLevels
      25             : 
      26             : const manifestMarkerName = `manifest`
      27             : 
      28             : // Provide type aliases for the various manifest structs.
      29             : type bulkVersionEdit = manifest.BulkVersionEdit
      30             : type deletedFileEntry = manifest.DeletedFileEntry
      31             : type fileMetadata = manifest.FileMetadata
      32             : type physicalMeta = manifest.PhysicalFileMeta
      33             : type virtualMeta = manifest.VirtualFileMeta
      34             : type fileBacking = manifest.FileBacking
      35             : type newFileEntry = manifest.NewFileEntry
      36             : type version = manifest.Version
      37             : type versionEdit = manifest.VersionEdit
      38             : type versionList = manifest.VersionList
      39             : 
      40             : // versionSet manages a collection of immutable versions, and manages the
      41             : // creation of a new version from the most recent version. A new version is
      42             : // created from an existing version by applying a version edit which is just
      43             : // like it sounds: a delta from the previous version. Version edits are logged
      44             : // to the MANIFEST file, which is replayed at startup.
      45             : type versionSet struct {
      46             :         // Next seqNum to use for WAL writes.
      47             :         logSeqNum atomic.Uint64
      48             : 
      49             :         // The upper bound on sequence numbers that have been assigned so far. A
      50             :         // suffix of these sequence numbers may not have been written to a WAL. Both
      51             :         // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
      52             :         // visibleSeqNum is <= logSeqNum.
      53             :         visibleSeqNum atomic.Uint64
      54             : 
      55             :         // Number of bytes present in sstables being written by in-progress
      56             :         // compactions. This value will be zero if there are no in-progress
      57             :         // compactions. Updated and read atomically.
      58             :         atomicInProgressBytes atomic.Int64
      59             : 
      60             :         // Immutable fields.
      61             :         dirname string
      62             :         // Set to DB.mu.
      63             :         mu      *sync.Mutex
      64             :         opts    *Options
      65             :         fs      vfs.FS
      66             :         cmp     Compare
      67             :         cmpName string
      68             :         // Dynamic base level allows the dynamic base level computation to be
      69             :         // disabled. Used by tests which want to create specific LSM structures.
      70             :         dynamicBaseLevel bool
      71             : 
      72             :         // Mutable fields.
      73             :         versions versionList
      74             :         picker   compactionPicker
      75             : 
      76             :         metrics Metrics
      77             : 
      78             :         // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
      79             :         // on the creation of every version.
      80             :         obsoleteFn        func(obsolete []*fileBacking)
      81             :         obsoleteTables    []fileInfo
      82             :         obsoleteManifests []fileInfo
      83             :         obsoleteOptions   []fileInfo
      84             : 
      85             :         // Zombie tables which have been removed from the current version but are
      86             :         // still referenced by an inuse iterator.
      87             :         zombieTables map[base.DiskFileNum]uint64 // filenum -> size
      88             : 
      89             :         // backingState is protected by the versionSet.logLock. It's populated
      90             :         // during Open in versionSet.load, but it's not used concurrently during
      91             :         // load.
      92             :         backingState struct {
      93             :                 // fileBackingMap is a map for the FileBacking which is supporting virtual
      94             :                 // sstables in the latest version. Once the file backing is backing no
      95             :                 // virtual sstables in the latest version, it is removed from this map and
      96             :                 // the corresponding state is added to the zombieTables map. Note that we
      97             :                 // don't keep track of file backing which supports a virtual sstable
      98             :                 // which is not in the latest version.
      99             :                 fileBackingMap map[base.DiskFileNum]*fileBacking
     100             :                 // fileBackingSize is the sum of the sizes of the fileBackings in the
     101             :                 // fileBackingMap.
     102             :                 fileBackingSize uint64
     103             :         }
     104             : 
     105             :         // minUnflushedLogNum is the smallest WAL log file number corresponding to
     106             :         // mutations that have not been flushed to an sstable.
     107             :         minUnflushedLogNum base.DiskFileNum
     108             : 
     109             :         // The next file number. A single counter is used to assign file
     110             :         // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
     111             :         nextFileNum uint64
     112             : 
     113             :         // The current manifest file number.
     114             :         manifestFileNum base.DiskFileNum
     115             :         manifestMarker  *atomicfs.Marker
     116             : 
     117             :         manifestFile          vfs.File
     118             :         manifest              *record.Writer
     119             :         setCurrent            func(base.DiskFileNum) error
     120             :         getFormatMajorVersion func() FormatMajorVersion
     121             : 
     122             :         writing    bool
     123             :         writerCond sync.Cond
     124             :         // State for deciding when to write a snapshot. Protected by mu.
     125             :         rotationHelper record.RotationHelper
     126             : }
     127             : 
     128             : func (vs *versionSet) init(
     129             :         dirname string,
     130             :         opts *Options,
     131             :         marker *atomicfs.Marker,
     132             :         setCurrent func(base.DiskFileNum) error,
     133             :         getFMV func() FormatMajorVersion,
     134             :         mu *sync.Mutex,
     135           1 : ) {
     136           1 :         vs.dirname = dirname
     137           1 :         vs.mu = mu
     138           1 :         vs.writerCond.L = mu
     139           1 :         vs.opts = opts
     140           1 :         vs.fs = opts.FS
     141           1 :         vs.cmp = opts.Comparer.Compare
     142           1 :         vs.cmpName = opts.Comparer.Name
     143           1 :         vs.dynamicBaseLevel = true
     144           1 :         vs.versions.Init(mu)
     145           1 :         vs.obsoleteFn = vs.addObsoleteLocked
     146           1 :         vs.zombieTables = make(map[base.DiskFileNum]uint64)
     147           1 :         vs.backingState.fileBackingMap = make(map[base.DiskFileNum]*fileBacking)
     148           1 :         vs.backingState.fileBackingSize = 0
     149           1 :         vs.nextFileNum = 1
     150           1 :         vs.manifestMarker = marker
     151           1 :         vs.setCurrent = setCurrent
     152           1 :         vs.getFormatMajorVersion = getFMV
     153           1 : }
     154             : 
     155             : // create creates a version set for a fresh DB.
     156             : func (vs *versionSet) create(
     157             :         jobID int,
     158             :         dirname string,
     159             :         opts *Options,
     160             :         marker *atomicfs.Marker,
     161             :         setCurrent func(base.DiskFileNum) error,
     162             :         getFormatMajorVersion func() FormatMajorVersion,
     163             :         mu *sync.Mutex,
     164           1 : ) error {
     165           1 :         vs.init(dirname, opts, marker, setCurrent, getFormatMajorVersion, mu)
     166           1 :         newVersion := &version{}
     167           1 :         vs.append(newVersion)
     168           1 :         var err error
     169           1 : 
     170           1 :         vs.picker = newCompactionPicker(newVersion, vs.opts, nil)
     171           1 :         // Note that a "snapshot" version edit is written to the manifest when it is
     172           1 :         // created.
     173           1 :         vs.manifestFileNum = vs.getNextDiskFileNum()
     174           1 :         err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum)
     175           1 :         if err == nil {
     176           1 :                 if err = vs.manifest.Flush(); err != nil {
     177           0 :                         vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
     178           0 :                 }
     179             :         }
     180           1 :         if err == nil {
     181           1 :                 if err = vs.manifestFile.Sync(); err != nil {
     182           0 :                         vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
     183           0 :                 }
     184             :         }
     185           1 :         if err == nil {
     186           1 :                 // NB: setCurrent is responsible for syncing the data directory.
     187           1 :                 if err = vs.setCurrent(vs.manifestFileNum); err != nil {
     188           0 :                         vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
     189           0 :                 }
     190             :         }
     191             : 
     192           1 :         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     193           1 :                 JobID:   jobID,
     194           1 :                 Path:    base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, vs.manifestFileNum),
     195           1 :                 FileNum: vs.manifestFileNum,
     196           1 :                 Err:     err,
     197           1 :         })
     198           1 :         if err != nil {
     199           0 :                 return err
     200           0 :         }
     201           1 :         return nil
     202             : }
     203             : 
     204             : // load loads the version set from the manifest file.
     205             : func (vs *versionSet) load(
     206             :         dirname string,
     207             :         opts *Options,
     208             :         manifestFileNum base.DiskFileNum,
     209             :         marker *atomicfs.Marker,
     210             :         setCurrent func(base.DiskFileNum) error,
     211             :         getFormatMajorVersion func() FormatMajorVersion,
     212             :         mu *sync.Mutex,
     213           1 : ) error {
     214           1 :         vs.init(dirname, opts, marker, setCurrent, getFormatMajorVersion, mu)
     215           1 : 
     216           1 :         vs.manifestFileNum = manifestFileNum
     217           1 :         manifestPath := base.MakeFilepath(opts.FS, dirname, fileTypeManifest, vs.manifestFileNum)
     218           1 :         manifestFilename := opts.FS.PathBase(manifestPath)
     219           1 : 
     220           1 :         // Read the versionEdits in the manifest file.
     221           1 :         var bve bulkVersionEdit
     222           1 :         bve.AddedByFileNum = make(map[base.FileNum]*fileMetadata)
     223           1 :         manifest, err := vs.fs.Open(manifestPath)
     224           1 :         if err != nil {
     225           0 :                 return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
     226           0 :                         errors.Safe(manifestFilename), dirname)
     227           0 :         }
     228           1 :         defer manifest.Close()
     229           1 :         rr := record.NewReader(manifest, 0 /* logNum */)
     230           1 :         for {
     231           1 :                 r, err := rr.Next()
     232           1 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     233           1 :                         break
     234             :                 }
     235           1 :                 if err != nil {
     236           0 :                         return errors.Wrapf(err, "pebble: error when loading manifest file %q",
     237           0 :                                 errors.Safe(manifestFilename))
     238           0 :                 }
     239           1 :                 var ve versionEdit
     240           1 :                 err = ve.Decode(r)
     241           1 :                 if err != nil {
     242           0 :                         // Break instead of returning an error if the record is corrupted
     243           0 :                         // or invalid.
     244           0 :                         if err == io.EOF || record.IsInvalidRecord(err) {
     245           0 :                                 break
     246             :                         }
     247           0 :                         return err
     248             :                 }
     249           1 :                 if ve.ComparerName != "" {
     250           1 :                         if ve.ComparerName != vs.cmpName {
     251           0 :                                 return errors.Errorf("pebble: manifest file %q for DB %q: "+
     252           0 :                                         "comparer name from file %q != comparer name from Options %q",
     253           0 :                                         errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmpName))
     254           0 :                         }
     255             :                 }
     256           1 :                 if err := bve.Accumulate(&ve); err != nil {
     257           0 :                         return err
     258           0 :                 }
     259           1 :                 if ve.MinUnflushedLogNum != 0 {
     260           1 :                         vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     261           1 :                 }
     262           1 :                 if ve.NextFileNum != 0 {
     263           1 :                         vs.nextFileNum = ve.NextFileNum
     264           1 :                 }
     265           1 :                 if ve.LastSeqNum != 0 {
     266           1 :                         // logSeqNum is the _next_ sequence number that will be assigned,
     267           1 :                         // while LastSeqNum is the last assigned sequence number. Note that
     268           1 :                         // this behaviour mimics that in RocksDB; the first sequence number
     269           1 :                         // assigned is one greater than the one present in the manifest
     270           1 :                         // (assuming no WALs contain higher sequence numbers than the
     271           1 :                         // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
     272           1 :                         // next sequence number that will be assigned.
     273           1 :                         //
     274           1 :                         // If LastSeqNum is less than SeqNumStart, increase it to at least
     275           1 :                         // SeqNumStart to leave ample room for reserved sequence numbers.
     276           1 :                         if ve.LastSeqNum+1 < base.SeqNumStart {
     277           0 :                                 vs.logSeqNum.Store(base.SeqNumStart)
     278           1 :                         } else {
     279           1 :                                 vs.logSeqNum.Store(ve.LastSeqNum + 1)
     280           1 :                         }
     281             :                 }
     282             :         }
     283             :         // We have already set vs.nextFileNum = 2 at the beginning of the
     284             :         // function and could have only updated it to some other non-zero value,
     285             :         // so it cannot be 0 here.
     286           1 :         if vs.minUnflushedLogNum == 0 {
     287           0 :                 if vs.nextFileNum >= 2 {
     288           0 :                         // We either have a freshly created DB, or a DB created by RocksDB
     289           0 :                         // that has not had a single flushed SSTable yet. This is because
     290           0 :                         // RocksDB bumps up nextFileNum in this case without bumping up
     291           0 :                         // minUnflushedLogNum, even if WALs with non-zero file numbers are
     292           0 :                         // present in the directory.
     293           0 :                 } else {
     294           0 :                         return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
     295           0 :                                 errors.Safe(manifestFilename), dirname)
     296           0 :                 }
     297             :         }
     298           1 :         vs.markFileNumUsed(vs.minUnflushedLogNum)
     299           1 : 
     300           1 :         // Populate the fileBackingMap and the FileBacking for virtual sstables since
     301           1 :         // we have finished version edit accumulation.
     302           1 :         for _, s := range bve.AddedFileBacking {
     303           1 :                 vs.addFileBacking(s)
     304           1 :         }
     305             : 
     306           1 :         for _, fileNum := range bve.RemovedFileBacking {
     307           1 :                 vs.removeFileBacking(fileNum)
     308           1 :         }
     309             : 
     310           1 :         newVersion, err := bve.Apply(
     311           1 :                 nil, vs.cmp, opts.Comparer.FormatKey, opts.FlushSplitBytes,
     312           1 :                 opts.Experimental.ReadCompactionRate, nil, /* zombies */
     313           1 :         )
     314           1 :         if err != nil {
     315           0 :                 return err
     316           0 :         }
     317           1 :         newVersion.L0Sublevels.InitCompactingFileInfo(nil /* in-progress compactions */)
     318           1 :         vs.append(newVersion)
     319           1 : 
     320           1 :         for i := range vs.metrics.Levels {
     321           1 :                 l := &vs.metrics.Levels[i]
     322           1 :                 l.NumFiles = int64(newVersion.Levels[i].Len())
     323           1 :                 files := newVersion.Levels[i].Slice()
     324           1 :                 l.Size = int64(files.SizeSum())
     325           1 :         }
     326             : 
     327           1 :         vs.picker = newCompactionPicker(newVersion, vs.opts, nil)
     328           1 :         return nil
     329             : }
     330             : 
     331           1 : func (vs *versionSet) close() error {
     332           1 :         if vs.manifestFile != nil {
     333           1 :                 if err := vs.manifestFile.Close(); err != nil {
     334           0 :                         return err
     335           0 :                 }
     336             :         }
     337           1 :         if vs.manifestMarker != nil {
     338           1 :                 if err := vs.manifestMarker.Close(); err != nil {
     339           0 :                         return err
     340           0 :                 }
     341             :         }
     342           1 :         return nil
     343             : }
     344             : 
     345             : // logLock locks the manifest for writing. The lock must be released by either
     346             : // a call to logUnlock or logAndApply.
     347             : //
     348             : // DB.mu must be held when calling this method, but the mutex may be dropped and
     349             : // re-acquired during the course of this method.
     350           1 : func (vs *versionSet) logLock() {
     351           1 :         // Wait for any existing writing to the manifest to complete, then mark the
     352           1 :         // manifest as busy.
     353           1 :         for vs.writing {
     354           1 :                 vs.writerCond.Wait()
     355           1 :         }
     356           1 :         vs.writing = true
     357             : }
     358             : 
     359             : // logUnlock releases the lock for manifest writing.
     360             : //
     361             : // DB.mu must be held when calling this method.
     362           1 : func (vs *versionSet) logUnlock() {
     363           1 :         if !vs.writing {
     364           0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     365           0 :         }
     366           1 :         vs.writing = false
     367           1 :         vs.writerCond.Signal()
     368             : }
     369             : 
     370             : // Only call if the DiskFileNum doesn't exist in the fileBackingMap.
     371           1 : func (vs *versionSet) addFileBacking(backing *manifest.FileBacking) {
     372           1 :         _, ok := vs.backingState.fileBackingMap[backing.DiskFileNum]
     373           1 :         if ok {
     374           0 :                 panic("pebble: trying to add an existing file backing")
     375             :         }
     376           1 :         vs.backingState.fileBackingMap[backing.DiskFileNum] = backing
     377           1 :         vs.backingState.fileBackingSize += backing.Size
     378             : }
     379             : 
     380             : // Only call if the the DiskFileNum exists in the fileBackingMap.
     381           1 : func (vs *versionSet) removeFileBacking(dfn base.DiskFileNum) {
     382           1 :         backing, ok := vs.backingState.fileBackingMap[dfn]
     383           1 :         if !ok {
     384           0 :                 panic("pebble: trying to remove an unknown file backing")
     385             :         }
     386           1 :         delete(vs.backingState.fileBackingMap, dfn)
     387           1 :         vs.backingState.fileBackingSize -= backing.Size
     388             : }
     389             : 
     390             : // logAndApply logs the version edit to the manifest, applies the version edit
     391             : // to the current version, and installs the new version.
     392             : //
     393             : // DB.mu must be held when calling this method and will be released temporarily
     394             : // while performing file I/O. Requires that the manifest is locked for writing
     395             : // (see logLock). Will unconditionally release the manifest lock (via
     396             : // logUnlock) even if an error occurs.
     397             : //
     398             : // inProgressCompactions is called while DB.mu is held, to get the list of
     399             : // in-progress compactions.
     400             : func (vs *versionSet) logAndApply(
     401             :         jobID int,
     402             :         ve *versionEdit,
     403             :         metrics map[int]*LevelMetrics,
     404             :         forceRotation bool,
     405             :         inProgressCompactions func() []compactionInfo,
     406           1 : ) error {
     407           1 :         if !vs.writing {
     408           0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     409           0 :         }
     410           1 :         defer vs.logUnlock()
     411           1 : 
     412           1 :         if ve.MinUnflushedLogNum != 0 {
     413           1 :                 if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
     414           1 :                         vs.nextFileNum <= uint64(ve.MinUnflushedLogNum) {
     415           0 :                         panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
     416           0 :                                 ve.MinUnflushedLogNum))
     417             :                 }
     418             :         }
     419             : 
     420             :         // This is the next manifest filenum, but if the current file is too big we
     421             :         // will write this ve to the next file which means what ve encodes is the
     422             :         // current filenum and not the next one.
     423             :         //
     424             :         // TODO(sbhola): figure out why this is correct and update comment.
     425           1 :         ve.NextFileNum = vs.nextFileNum
     426           1 : 
     427           1 :         // LastSeqNum is set to the current upper bound on the assigned sequence
     428           1 :         // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
     429           1 :         // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
     430           1 :         // replay. It must be higher than or equal to any than any sequence number
     431           1 :         // written to an sstable, including sequence numbers in ingested files.
     432           1 :         // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
     433           1 :         // number. This is fallout from ingestion which allows a sequence number X to
     434           1 :         // be assigned to an ingested sstable even though sequence number X-1 resides
     435           1 :         // in an unflushed memtable. logSeqNum is the _next_ sequence number that
     436           1 :         // will be assigned, so subtract that by 1 to get the upper bound on the
     437           1 :         // last assigned sequence number.
     438           1 :         logSeqNum := vs.logSeqNum.Load()
     439           1 :         ve.LastSeqNum = logSeqNum - 1
     440           1 :         if logSeqNum == 0 {
     441           0 :                 // logSeqNum is initialized to 1 in Open() if there are no previous WAL
     442           0 :                 // or manifest records, so this case should never happen.
     443           0 :                 vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
     444           0 :         }
     445             : 
     446           1 :         currentVersion := vs.currentVersion()
     447           1 :         var newVersion *version
     448           1 : 
     449           1 :         // Generate a new manifest if we don't currently have one, or forceRotation
     450           1 :         // is true, or the current one is too large.
     451           1 :         //
     452           1 :         // For largeness, we do not exclusively use MaxManifestFileSize size
     453           1 :         // threshold since we have had incidents where due to either large keys or
     454           1 :         // large numbers of files, each edit results in a snapshot + write of the
     455           1 :         // edit. This slows the system down since each flush or compaction is
     456           1 :         // writing a new manifest snapshot. The primary goal of the size-based
     457           1 :         // rollover logic is to ensure that when reopening a DB, the number of edits
     458           1 :         // that need to be replayed on top of the snapshot is "sane". Rolling over
     459           1 :         // to a new manifest after each edit is not relevant to that goal.
     460           1 :         //
     461           1 :         // Consider the following cases:
     462           1 :         // - The number of live files F in the DB is roughly stable: after writing
     463           1 :         //   the snapshot (with F files), say we require that there be enough edits
     464           1 :         //   such that the cumulative number of files in those edits, E, be greater
     465           1 :         //   than F. This will ensure that the total amount of time in logAndApply
     466           1 :         //   that is spent in snapshot writing is ~50%.
     467           1 :         //
     468           1 :         // - The number of live files F in the DB is shrinking drastically, say from
     469           1 :         //   F to F/10: This can happen for various reasons, like wide range
     470           1 :         //   tombstones, or large numbers of smaller than usual files that are being
     471           1 :         //   merged together into larger files. And say the new files generated
     472           1 :         //   during this shrinkage is insignificant compared to F/10, and so for
     473           1 :         //   this example we will assume it is effectively 0. After this shrinking,
     474           1 :         //   E = 0.9F, and so if we used the previous snapshot file count, F, as the
     475           1 :         //   threshold that needs to be exceeded, we will further delay the snapshot
     476           1 :         //   writing. Which means on DB reopen we will need to replay 0.9F edits to
     477           1 :         //   get to a version with 0.1F files. It would be better to create a new
     478           1 :         //   snapshot when E exceeds the number of files in the current version.
     479           1 :         //
     480           1 :         // - The number of live files F in the DB is growing via perfect ingests
     481           1 :         //   into L6: Say we wrote the snapshot when there were F files and now we
     482           1 :         //   have 10F files, so E = 9F. We will further delay writing a new
     483           1 :         //   snapshot. This case can be critiqued as contrived, but we consider it
     484           1 :         //   nonetheless.
     485           1 :         //
     486           1 :         // The logic below uses the min of the last snapshot file count and the file
     487           1 :         // count in the current version.
     488           1 :         vs.rotationHelper.AddRecord(int64(len(ve.DeletedFiles) + len(ve.NewFiles)))
     489           1 :         sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
     490           1 :         requireRotation := forceRotation || vs.manifest == nil
     491           1 : 
     492           1 :         var nextSnapshotFilecount int64
     493           1 :         for i := range vs.metrics.Levels {
     494           1 :                 nextSnapshotFilecount += vs.metrics.Levels[i].NumFiles
     495           1 :         }
     496           1 :         if sizeExceeded && !requireRotation {
     497           1 :                 requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
     498           1 :         }
     499           1 :         var newManifestFileNum base.DiskFileNum
     500           1 :         var prevManifestFileSize uint64
     501           1 :         if requireRotation {
     502           1 :                 newManifestFileNum = vs.getNextDiskFileNum()
     503           1 :                 prevManifestFileSize = uint64(vs.manifest.Size())
     504           1 :         }
     505             : 
     506             :         // Grab certain values before releasing vs.mu, in case createManifest() needs
     507             :         // to be called.
     508           1 :         minUnflushedLogNum := vs.minUnflushedLogNum
     509           1 :         nextFileNum := vs.nextFileNum
     510           1 : 
     511           1 :         var zombies map[base.DiskFileNum]uint64
     512           1 :         if err := func() error {
     513           1 :                 vs.mu.Unlock()
     514           1 :                 defer vs.mu.Lock()
     515           1 : 
     516           1 :                 var err error
     517           1 :                 if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
     518           0 :                         return errors.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
     519           0 :                 }
     520           1 :                 newVersion, zombies, err = manifest.AccumulateIncompleteAndApplySingleVE(
     521           1 :                         ve, currentVersion, vs.cmp, vs.opts.Comparer.FormatKey,
     522           1 :                         vs.opts.FlushSplitBytes, vs.opts.Experimental.ReadCompactionRate,
     523           1 :                         vs.backingState.fileBackingMap, vs.addFileBacking, vs.removeFileBacking,
     524           1 :                 )
     525           1 :                 if err != nil {
     526           0 :                         return errors.Wrap(err, "MANIFEST apply failed")
     527           0 :                 }
     528             : 
     529           1 :                 if newManifestFileNum != 0 {
     530           1 :                         if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum); err != nil {
     531           0 :                                 vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     532           0 :                                         JobID:   jobID,
     533           0 :                                         Path:    base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
     534           0 :                                         FileNum: newManifestFileNum,
     535           0 :                                         Err:     err,
     536           0 :                                 })
     537           0 :                                 return errors.Wrap(err, "MANIFEST create failed")
     538           0 :                         }
     539             :                 }
     540             : 
     541           1 :                 w, err := vs.manifest.Next()
     542           1 :                 if err != nil {
     543           0 :                         return errors.Wrap(err, "MANIFEST next record write failed")
     544           0 :                 }
     545             : 
     546             :                 // NB: Any error from this point on is considered fatal as we don't know if
     547             :                 // the MANIFEST write occurred or not. Trying to determine that is
     548             :                 // fraught. Instead we rely on the standard recovery mechanism run when a
     549             :                 // database is open. In particular, that mechanism generates a new MANIFEST
     550             :                 // and ensures it is synced.
     551           1 :                 if err := ve.Encode(w); err != nil {
     552           0 :                         return errors.Wrap(err, "MANIFEST write failed")
     553           0 :                 }
     554           1 :                 if err := vs.manifest.Flush(); err != nil {
     555           0 :                         return errors.Wrap(err, "MANIFEST flush failed")
     556           0 :                 }
     557           1 :                 if err := vs.manifestFile.Sync(); err != nil {
     558           0 :                         return errors.Wrap(err, "MANIFEST sync failed")
     559           0 :                 }
     560           1 :                 if newManifestFileNum != 0 {
     561           1 :                         // NB: setCurrent is responsible for syncing the data directory.
     562           1 :                         if err := vs.setCurrent(newManifestFileNum); err != nil {
     563           0 :                                 return errors.Wrap(err, "MANIFEST set current failed")
     564           0 :                         }
     565           1 :                         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     566           1 :                                 JobID:   jobID,
     567           1 :                                 Path:    base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum),
     568           1 :                                 FileNum: newManifestFileNum,
     569           1 :                         })
     570             :                 }
     571           1 :                 return nil
     572           0 :         }(); err != nil {
     573           0 :                 // Any error encountered during any of the operations in the previous
     574           0 :                 // closure are considered fatal. Treating such errors as fatal is preferred
     575           0 :                 // to attempting to unwind various file and b-tree reference counts, and
     576           0 :                 // re-generating L0 sublevel metadata. This may change in the future, if
     577           0 :                 // certain manifest / WAL operations become retryable. For more context, see
     578           0 :                 // #1159 and #1792.
     579           0 :                 vs.opts.Logger.Fatalf("%s", err)
     580           0 :                 return err
     581           0 :         }
     582             : 
     583           1 :         if requireRotation {
     584           1 :                 // Successfully rotated.
     585           1 :                 vs.rotationHelper.Rotate(nextSnapshotFilecount)
     586           1 :         }
     587             :         // Now that DB.mu is held again, initialize compacting file info in
     588             :         // L0Sublevels.
     589           1 :         inProgress := inProgressCompactions()
     590           1 : 
     591           1 :         newVersion.L0Sublevels.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
     592           1 : 
     593           1 :         // Update the zombie tables set first, as installation of the new version
     594           1 :         // will unref the previous version which could result in addObsoleteLocked
     595           1 :         // being called.
     596           1 :         for fileNum, size := range zombies {
     597           1 :                 vs.zombieTables[fileNum] = size
     598           1 :         }
     599             : 
     600             :         // Install the new version.
     601           1 :         vs.append(newVersion)
     602           1 :         if ve.MinUnflushedLogNum != 0 {
     603           1 :                 vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     604           1 :         }
     605           1 :         if newManifestFileNum != 0 {
     606           1 :                 if vs.manifestFileNum != 0 {
     607           1 :                         vs.obsoleteManifests = append(vs.obsoleteManifests, fileInfo{
     608           1 :                                 fileNum:  vs.manifestFileNum,
     609           1 :                                 fileSize: prevManifestFileSize,
     610           1 :                         })
     611           1 :                 }
     612           1 :                 vs.manifestFileNum = newManifestFileNum
     613             :         }
     614             : 
     615           1 :         for level, update := range metrics {
     616           1 :                 vs.metrics.Levels[level].Add(update)
     617           1 :         }
     618           1 :         for i := range vs.metrics.Levels {
     619           1 :                 l := &vs.metrics.Levels[i]
     620           1 :                 l.NumFiles = int64(newVersion.Levels[i].Len())
     621           1 :                 l.NumVirtualFiles = newVersion.Levels[i].NumVirtual
     622           1 :                 l.VirtualSize = newVersion.Levels[i].VirtualSize
     623           1 :                 l.Size = int64(newVersion.Levels[i].Size())
     624           1 : 
     625           1 :                 l.Sublevels = 0
     626           1 :                 if l.NumFiles > 0 {
     627           1 :                         l.Sublevels = 1
     628           1 :                 }
     629           1 :                 if invariants.Enabled {
     630           1 :                         levelFiles := newVersion.Levels[i].Slice()
     631           1 :                         if size := int64(levelFiles.SizeSum()); l.Size != size {
     632           0 :                                 vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Size, size)
     633           0 :                         }
     634           1 :                         if nVirtual := levelFiles.NumVirtual(); nVirtual != l.NumVirtualFiles {
     635           0 :                                 vs.opts.Logger.Fatalf(
     636           0 :                                         "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
     637           0 :                                         i, l.NumVirtualFiles, nVirtual,
     638           0 :                                 )
     639           0 :                         }
     640           1 :                         if vSize := levelFiles.VirtualSizeSum(); vSize != l.VirtualSize {
     641           0 :                                 vs.opts.Logger.Fatalf(
     642           0 :                                         "versionSet metrics L%d Virtual size = %d, actual size = %d",
     643           0 :                                         i, l.VirtualSize, vSize,
     644           0 :                                 )
     645           0 :                         }
     646             :                 }
     647             :         }
     648           1 :         vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
     649           1 : 
     650           1 :         vs.picker = newCompactionPicker(newVersion, vs.opts, inProgress)
     651           1 :         if !vs.dynamicBaseLevel {
     652           0 :                 vs.picker.forceBaseLevel1()
     653           0 :         }
     654           1 :         return nil
     655             : }
     656             : 
     657             : func (vs *versionSet) incrementCompactions(
     658             :         kind compactionKind, extraLevels []*compactionLevel, pickerMetrics compactionPickerMetrics,
     659           1 : ) {
     660           1 :         switch kind {
     661           1 :         case compactionKindDefault:
     662           1 :                 vs.metrics.Compact.Count++
     663           1 :                 vs.metrics.Compact.DefaultCount++
     664             : 
     665           1 :         case compactionKindFlush, compactionKindIngestedFlushable:
     666           1 :                 vs.metrics.Flush.Count++
     667             : 
     668           1 :         case compactionKindMove:
     669           1 :                 vs.metrics.Compact.Count++
     670           1 :                 vs.metrics.Compact.MoveCount++
     671             : 
     672           1 :         case compactionKindDeleteOnly:
     673           1 :                 vs.metrics.Compact.Count++
     674           1 :                 vs.metrics.Compact.DeleteOnlyCount++
     675             : 
     676           1 :         case compactionKindElisionOnly:
     677           1 :                 vs.metrics.Compact.Count++
     678           1 :                 vs.metrics.Compact.ElisionOnlyCount++
     679             : 
     680           0 :         case compactionKindRead:
     681           0 :                 vs.metrics.Compact.Count++
     682           0 :                 vs.metrics.Compact.ReadCount++
     683             : 
     684           0 :         case compactionKindRewrite:
     685           0 :                 vs.metrics.Compact.Count++
     686           0 :                 vs.metrics.Compact.RewriteCount++
     687             :         }
     688           1 :         if len(extraLevels) > 0 {
     689           1 :                 vs.metrics.Compact.MultiLevelCount++
     690           1 :         }
     691             : }
     692             : 
     693           1 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
     694           1 :         vs.atomicInProgressBytes.Add(numBytes)
     695           1 : }
     696             : 
     697             : // createManifest creates a manifest file that contains a snapshot of vs.
     698             : func (vs *versionSet) createManifest(
     699             :         dirname string, fileNum, minUnflushedLogNum base.DiskFileNum, nextFileNum uint64,
     700           1 : ) (err error) {
     701           1 :         var (
     702           1 :                 filename     = base.MakeFilepath(vs.fs, dirname, fileTypeManifest, fileNum)
     703           1 :                 manifestFile vfs.File
     704           1 :                 manifest     *record.Writer
     705           1 :         )
     706           1 :         defer func() {
     707           1 :                 if manifest != nil {
     708           0 :                         manifest.Close()
     709           0 :                 }
     710           1 :                 if manifestFile != nil {
     711           0 :                         manifestFile.Close()
     712           0 :                 }
     713           1 :                 if err != nil {
     714           0 :                         vs.fs.Remove(filename)
     715           0 :                 }
     716             :         }()
     717           1 :         manifestFile, err = vs.fs.Create(filename)
     718           1 :         if err != nil {
     719           0 :                 return err
     720           0 :         }
     721           1 :         manifest = record.NewWriter(manifestFile)
     722           1 : 
     723           1 :         snapshot := versionEdit{
     724           1 :                 ComparerName: vs.cmpName,
     725           1 :         }
     726           1 :         dedup := make(map[base.DiskFileNum]struct{})
     727           1 :         for level, levelMetadata := range vs.currentVersion().Levels {
     728           1 :                 iter := levelMetadata.Iter()
     729           1 :                 for meta := iter.First(); meta != nil; meta = iter.Next() {
     730           1 :                         snapshot.NewFiles = append(snapshot.NewFiles, newFileEntry{
     731           1 :                                 Level: level,
     732           1 :                                 Meta:  meta,
     733           1 :                         })
     734           1 :                         if _, ok := dedup[meta.FileBacking.DiskFileNum]; meta.Virtual && !ok {
     735           1 :                                 dedup[meta.FileBacking.DiskFileNum] = struct{}{}
     736           1 :                                 snapshot.CreatedBackingTables = append(
     737           1 :                                         snapshot.CreatedBackingTables,
     738           1 :                                         meta.FileBacking,
     739           1 :                                 )
     740           1 :                         }
     741             :                 }
     742             :         }
     743             : 
     744             :         // When creating a version snapshot for an existing DB, this snapshot VersionEdit will be
     745             :         // immediately followed by another VersionEdit (being written in logAndApply()). That
     746             :         // VersionEdit always contains a LastSeqNum, so we don't need to include that in the snapshot.
     747             :         // But it does not necessarily include MinUnflushedLogNum, NextFileNum, so we initialize those
     748             :         // using the corresponding fields in the versionSet (which came from the latest preceding
     749             :         // VersionEdit that had those fields).
     750           1 :         snapshot.MinUnflushedLogNum = minUnflushedLogNum
     751           1 :         snapshot.NextFileNum = nextFileNum
     752           1 : 
     753           1 :         w, err1 := manifest.Next()
     754           1 :         if err1 != nil {
     755           0 :                 return err1
     756           0 :         }
     757           1 :         if err := snapshot.Encode(w); err != nil {
     758           0 :                 return err
     759           0 :         }
     760             : 
     761           1 :         if vs.manifest != nil {
     762           1 :                 vs.manifest.Close()
     763           1 :                 vs.manifest = nil
     764           1 :         }
     765           1 :         if vs.manifestFile != nil {
     766           1 :                 if err := vs.manifestFile.Close(); err != nil {
     767           0 :                         return err
     768           0 :                 }
     769           1 :                 vs.manifestFile = nil
     770             :         }
     771             : 
     772           1 :         vs.manifest, manifest = manifest, nil
     773           1 :         vs.manifestFile, manifestFile = manifestFile, nil
     774           1 :         return nil
     775             : }
     776             : 
     777           1 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
     778           1 :         if vs.nextFileNum <= uint64(fileNum) {
     779           0 :                 vs.nextFileNum = uint64(fileNum + 1)
     780           0 :         }
     781             : }
     782             : 
     783           1 : func (vs *versionSet) getNextFileNum() base.FileNum {
     784           1 :         x := vs.nextFileNum
     785           1 :         vs.nextFileNum++
     786           1 :         return base.FileNum(x)
     787           1 : }
     788             : 
     789           1 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
     790           1 :         x := vs.nextFileNum
     791           1 :         vs.nextFileNum++
     792           1 :         return base.DiskFileNum(x)
     793           1 : }
     794             : 
     795           1 : func (vs *versionSet) append(v *version) {
     796           1 :         if v.Refs() != 0 {
     797           0 :                 panic("pebble: version should be unreferenced")
     798             :         }
     799           1 :         if !vs.versions.Empty() {
     800           1 :                 vs.versions.Back().UnrefLocked()
     801           1 :         }
     802           1 :         v.Deleted = vs.obsoleteFn
     803           1 :         v.Ref()
     804           1 :         vs.versions.PushBack(v)
     805             : }
     806             : 
     807           1 : func (vs *versionSet) currentVersion() *version {
     808           1 :         return vs.versions.Back()
     809           1 : }
     810             : 
     811           1 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
     812           1 :         current := vs.currentVersion()
     813           1 :         for v := vs.versions.Front(); true; v = v.Next() {
     814           1 :                 for _, lm := range v.Levels {
     815           1 :                         iter := lm.Iter()
     816           1 :                         for f := iter.First(); f != nil; f = iter.Next() {
     817           1 :                                 m[f.FileBacking.DiskFileNum] = struct{}{}
     818           1 :                         }
     819             :                 }
     820           1 :                 if v == current {
     821           1 :                         break
     822             :                 }
     823             :         }
     824             : }
     825             : 
     826             : // addObsoleteLocked will add the fileInfo associated with obsolete backing
     827             : // sstables to the obsolete tables list.
     828             : //
     829             : // The file backings in the obsolete list must not appear more than once.
     830             : //
     831             : // DB.mu must be held when addObsoleteLocked is called.
     832           1 : func (vs *versionSet) addObsoleteLocked(obsolete []*fileBacking) {
     833           1 :         if len(obsolete) == 0 {
     834           1 :                 return
     835           1 :         }
     836             : 
     837           1 :         obsoleteFileInfo := make([]fileInfo, len(obsolete))
     838           1 :         for i, bs := range obsolete {
     839           1 :                 obsoleteFileInfo[i].fileNum = bs.DiskFileNum
     840           1 :                 obsoleteFileInfo[i].fileSize = bs.Size
     841           1 :         }
     842             : 
     843           1 :         if invariants.Enabled {
     844           1 :                 dedup := make(map[base.DiskFileNum]struct{})
     845           1 :                 for _, fi := range obsoleteFileInfo {
     846           1 :                         dedup[fi.fileNum] = struct{}{}
     847           1 :                 }
     848           1 :                 if len(dedup) != len(obsoleteFileInfo) {
     849           0 :                         panic("pebble: duplicate FileBacking present in obsolete list")
     850             :                 }
     851             :         }
     852             : 
     853           1 :         for _, fi := range obsoleteFileInfo {
     854           1 :                 // Note that the obsolete tables are no longer zombie by the definition of
     855           1 :                 // zombie, but we leave them in the zombie tables map until they are
     856           1 :                 // deleted from disk.
     857           1 :                 if _, ok := vs.zombieTables[fi.fileNum]; !ok {
     858           0 :                         vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fi.fileNum)
     859           0 :                 }
     860             :         }
     861             : 
     862           1 :         vs.obsoleteTables = append(vs.obsoleteTables, obsoleteFileInfo...)
     863           1 :         vs.updateObsoleteTableMetricsLocked()
     864             : }
     865             : 
     866             : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
     867             : // called.
     868           1 : func (vs *versionSet) addObsolete(obsolete []*fileBacking) {
     869           1 :         vs.mu.Lock()
     870           1 :         defer vs.mu.Unlock()
     871           1 :         vs.addObsoleteLocked(obsolete)
     872           1 : }
     873             : 
     874           1 : func (vs *versionSet) updateObsoleteTableMetricsLocked() {
     875           1 :         vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
     876           1 :         vs.metrics.Table.ObsoleteSize = 0
     877           1 :         for _, fi := range vs.obsoleteTables {
     878           1 :                 vs.metrics.Table.ObsoleteSize += fi.fileSize
     879           1 :         }
     880             : }
     881             : 
     882             : func setCurrentFunc(
     883             :         vers FormatMajorVersion, marker *atomicfs.Marker, fs vfs.FS, dirname string, dir vfs.File,
     884           1 : ) func(base.DiskFileNum) error {
     885           1 :         if vers < formatVersionedManifestMarker {
     886           1 :                 // Pebble versions before `formatVersionedManifestMarker` used
     887           1 :                 // the CURRENT file to signal which MANIFEST is current. Ignore
     888           1 :                 // the filename read during LocateMarker.
     889           1 :                 return func(manifestFileNum base.DiskFileNum) error {
     890           1 :                         if err := setCurrentFile(dirname, fs, manifestFileNum); err != nil {
     891           0 :                                 return err
     892           0 :                         }
     893           1 :                         if err := dir.Sync(); err != nil {
     894           0 :                                 // This is a panic here, rather than higher in the call
     895           0 :                                 // stack, for parity with the atomicfs.Marker behavior.
     896           0 :                                 // A panic is always necessary because failed Syncs are
     897           0 :                                 // unrecoverable.
     898           0 :                                 panic(errors.Wrap(err, "fatal: MANIFEST dirsync failed"))
     899             :                         }
     900           1 :                         return nil
     901             :                 }
     902             :         }
     903           1 :         return setCurrentFuncMarker(marker, fs, dirname)
     904             : }
     905             : 
     906             : func setCurrentFuncMarker(
     907             :         marker *atomicfs.Marker, fs vfs.FS, dirname string,
     908           1 : ) func(base.DiskFileNum) error {
     909           1 :         return func(manifestFileNum base.DiskFileNum) error {
     910           1 :                 return marker.Move(base.MakeFilename(fileTypeManifest, manifestFileNum))
     911           1 :         }
     912             : }
     913             : 
     914             : func findCurrentManifest(
     915             :         vers FormatMajorVersion, fs vfs.FS, dirname string,
     916           1 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
     917           1 :         // NB: We always locate the manifest marker, even if we might not
     918           1 :         // actually use it (because we're opening the database at an earlier
     919           1 :         // format major version that uses the CURRENT file).  Locating a
     920           1 :         // marker should succeed even if the marker has never been placed.
     921           1 :         var filename string
     922           1 :         marker, filename, err = atomicfs.LocateMarker(fs, dirname, manifestMarkerName)
     923           1 :         if err != nil {
     924           0 :                 return nil, base.FileNum(0).DiskFileNum(), false, err
     925           0 :         }
     926             : 
     927           1 :         if vers < formatVersionedManifestMarker {
     928           1 :                 // Pebble versions before `formatVersionedManifestMarker` used
     929           1 :                 // the CURRENT file to signal which MANIFEST is current. Ignore
     930           1 :                 // the filename read during LocateMarker.
     931           1 : 
     932           1 :                 manifestNum, err = readCurrentFile(fs, dirname)
     933           1 :                 if oserror.IsNotExist(err) {
     934           1 :                         return marker, base.FileNum(0).DiskFileNum(), false, nil
     935           1 :                 } else if err != nil {
     936           0 :                         return marker, base.FileNum(0).DiskFileNum(), false, err
     937           0 :                 }
     938           0 :                 return marker, manifestNum, true, nil
     939             :         }
     940             : 
     941             :         // The current format major version is >=
     942             :         // formatVersionedManifestMarker indicating that the
     943             :         // atomicfs.Marker is the source of truth on the current manifest.
     944             : 
     945           1 :         if filename == "" {
     946           0 :                 // The marker hasn't been set yet. This database doesn't exist.
     947           0 :                 return marker, base.FileNum(0).DiskFileNum(), false, nil
     948           0 :         }
     949             : 
     950           1 :         var ok bool
     951           1 :         _, manifestNum, ok = base.ParseFilename(fs, filename)
     952           1 :         if !ok {
     953           0 :                 return marker, base.FileNum(0).DiskFileNum(), false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
     954           0 :         }
     955           1 :         return marker, manifestNum, true, nil
     956             : }
     957             : 
     958           1 : func readCurrentFile(fs vfs.FS, dirname string) (base.DiskFileNum, error) {
     959           1 :         // Read the CURRENT file to find the current manifest file.
     960           1 :         current, err := fs.Open(base.MakeFilepath(fs, dirname, fileTypeCurrent, base.FileNum(0).DiskFileNum()))
     961           1 :         if err != nil {
     962           1 :                 return base.FileNum(0).DiskFileNum(), errors.Wrapf(err, "pebble: could not open CURRENT file for DB %q", dirname)
     963           1 :         }
     964           0 :         defer current.Close()
     965           0 :         stat, err := current.Stat()
     966           0 :         if err != nil {
     967           0 :                 return base.FileNum(0).DiskFileNum(), err
     968           0 :         }
     969           0 :         n := stat.Size()
     970           0 :         if n == 0 {
     971           0 :                 return base.FileNum(0).DiskFileNum(), errors.Errorf("pebble: CURRENT file for DB %q is empty", dirname)
     972           0 :         }
     973           0 :         if n > 4096 {
     974           0 :                 return base.FileNum(0).DiskFileNum(), errors.Errorf("pebble: CURRENT file for DB %q is too large", dirname)
     975           0 :         }
     976           0 :         b := make([]byte, n)
     977           0 :         _, err = current.ReadAt(b, 0)
     978           0 :         if err != nil {
     979           0 :                 return base.FileNum(0).DiskFileNum(), err
     980           0 :         }
     981           0 :         if b[n-1] != '\n' {
     982           0 :                 return base.FileNum(0).DiskFileNum(), base.CorruptionErrorf("pebble: CURRENT file for DB %q is malformed", dirname)
     983           0 :         }
     984           0 :         b = bytes.TrimSpace(b)
     985           0 : 
     986           0 :         _, manifestFileNum, ok := base.ParseFilename(fs, string(b))
     987           0 :         if !ok {
     988           0 :                 return base.FileNum(0).DiskFileNum(), base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(b))
     989           0 :         }
     990           0 :         return manifestFileNum, nil
     991             : }
     992             : 
     993           1 : func newFileMetrics(newFiles []manifest.NewFileEntry) map[int]*LevelMetrics {
     994           1 :         m := map[int]*LevelMetrics{}
     995           1 :         for _, nf := range newFiles {
     996           1 :                 lm := m[nf.Level]
     997           1 :                 if lm == nil {
     998           1 :                         lm = &LevelMetrics{}
     999           1 :                         m[nf.Level] = lm
    1000           1 :                 }
    1001           1 :                 lm.NumFiles++
    1002           1 :                 lm.Size += int64(nf.Meta.Size)
    1003             :         }
    1004           1 :         return m
    1005             : }

Generated by: LCOV version 1.14