LCOV - code coverage report
Current view: top level - pebble - version_set.go (source / functions) Coverage Total Hit
Test: 2025-02-28 08:17Z 9af14eed - meta test only.lcov Lines: 81.3 % 732 595
Test Date: 2025-02-28 08:18:29 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "fmt"
       9              :         "io"
      10              :         "sync"
      11              :         "sync/atomic"
      12              : 
      13              :         "github.com/cockroachdb/errors"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/invariants"
      16              :         "github.com/cockroachdb/pebble/internal/manifest"
      17              :         "github.com/cockroachdb/pebble/objstorage"
      18              :         "github.com/cockroachdb/pebble/record"
      19              :         "github.com/cockroachdb/pebble/vfs"
      20              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      21              : )
      22              : 
      23              : const numLevels = manifest.NumLevels
      24              : 
      25              : const manifestMarkerName = `manifest`
      26              : 
      27              : // Provide type aliases for the various manifest structs.
      28              : type bulkVersionEdit = manifest.BulkVersionEdit
      29              : type deletedFileEntry = manifest.DeletedTableEntry
      30              : type tableMetadata = manifest.TableMetadata
      31              : type physicalMeta = manifest.PhysicalTableMeta
      32              : type virtualMeta = manifest.VirtualTableMeta
      33              : type fileBacking = manifest.FileBacking
      34              : type newTableEntry = manifest.NewTableEntry
      35              : type version = manifest.Version
      36              : type versionEdit = manifest.VersionEdit
      37              : type versionList = manifest.VersionList
      38              : 
      39              : // versionSet manages a collection of immutable versions, and manages the
      40              : // creation of a new version from the most recent version. A new version is
      41              : // created from an existing version by applying a version edit which is just
      42              : // like it sounds: a delta from the previous version. Version edits are logged
      43              : // to the MANIFEST file, which is replayed at startup.
      44              : type versionSet struct {
      45              :         // Next seqNum to use for WAL writes.
      46              :         logSeqNum base.AtomicSeqNum
      47              : 
      48              :         // The upper bound on sequence numbers that have been assigned so far. A
      49              :         // suffix of these sequence numbers may not have been written to a WAL. Both
      50              :         // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
      51              :         // visibleSeqNum is <= logSeqNum.
      52              :         visibleSeqNum base.AtomicSeqNum
      53              : 
      54              :         // Number of bytes present in sstables being written by in-progress
      55              :         // compactions. This value will be zero if there are no in-progress
      56              :         // compactions. Updated and read atomically.
      57              :         atomicInProgressBytes atomic.Int64
      58              : 
      59              :         // Immutable fields.
      60              :         dirname  string
      61              :         provider objstorage.Provider
      62              :         // Set to DB.mu.
      63              :         mu   *sync.Mutex
      64              :         opts *Options
      65              :         fs   vfs.FS
      66              :         cmp  *base.Comparer
      67              :         // Dynamic base level allows the dynamic base level computation to be
      68              :         // disabled. Used by tests which want to create specific LSM structures.
      69              :         dynamicBaseLevel bool
      70              : 
      71              :         // Mutable fields.
      72              :         versions versionList
      73              :         picker   compactionPicker
      74              : 
      75              :         // Not all metrics are kept here. See DB.Metrics().
      76              :         metrics Metrics
      77              : 
      78              :         // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
      79              :         // on the creation of every version.
      80              :         obsoleteFn        func(manifest.ObsoleteFiles)
      81              :         obsoleteTables    []objectInfo
      82              :         obsoleteBlobs     []objectInfo
      83              :         obsoleteManifests []fileInfo
      84              :         obsoleteOptions   []fileInfo
      85              : 
      86              :         // Zombie tables which have been removed from the current version but are
      87              :         // still referenced by an inuse iterator.
      88              :         zombieTables map[base.DiskFileNum]objectInfo
      89              : 
      90              :         // virtualBackings contains information about the FileBackings which support
      91              :         // virtual sstables in the latest version. It is mainly used to determine when
      92              :         // a backing is no longer in use by the tables in the latest version; this is
      93              :         // not a trivial problem because compactions and other operations can happen
      94              :         // in parallel (and they can finish in unpredictable order).
      95              :         //
      96              :         // This mechanism is complementary to the backing Ref/Unref mechanism, which
      97              :         // determines when a backing is no longer used by *any* live version and can
      98              :         // be removed.
      99              :         //
     100              :         // In principle this should have been a copy-on-write structure, with each
     101              :         // Version having its own record of its virtual backings (similar to the
     102              :         // B-tree that holds the tables). However, in practice we only need it for the
     103              :         // latest version, so we use a simpler structure and store it in the
     104              :         // versionSet instead.
     105              :         //
     106              :         // virtualBackings is modified under DB.mu and the log lock. If it is accessed
     107              :         // under DB.mu and a version update is in progress, it reflects the state of
     108              :         // the next version.
     109              :         virtualBackings manifest.VirtualBackings
     110              : 
     111              :         // minUnflushedLogNum is the smallest WAL log file number corresponding to
     112              :         // mutations that have not been flushed to an sstable.
     113              :         minUnflushedLogNum base.DiskFileNum
     114              : 
     115              :         // The next file number. A single counter is used to assign file
     116              :         // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
     117              :         nextFileNum atomic.Uint64
     118              : 
     119              :         // The current manifest file number.
     120              :         manifestFileNum base.DiskFileNum
     121              :         manifestMarker  *atomicfs.Marker
     122              : 
     123              :         manifestFile          vfs.File
     124              :         manifest              *record.Writer
     125              :         getFormatMajorVersion func() FormatMajorVersion
     126              : 
     127              :         writing    bool
     128              :         writerCond sync.Cond
     129              :         // State for deciding when to write a snapshot. Protected by mu.
     130              :         rotationHelper record.RotationHelper
     131              : }
     132              : 
     133              : // objectInfo describes an object in object storage (either a sstable or a blob
     134              : // file).
     135              : type objectInfo struct {
     136              :         fileInfo
     137              :         isLocal bool
     138              : }
     139              : 
     140              : func (vs *versionSet) init(
     141              :         dirname string,
     142              :         provider objstorage.Provider,
     143              :         opts *Options,
     144              :         marker *atomicfs.Marker,
     145              :         getFMV func() FormatMajorVersion,
     146              :         mu *sync.Mutex,
     147            1 : ) {
     148            1 :         vs.dirname = dirname
     149            1 :         vs.provider = provider
     150            1 :         vs.mu = mu
     151            1 :         vs.writerCond.L = mu
     152            1 :         vs.opts = opts
     153            1 :         vs.fs = opts.FS
     154            1 :         vs.cmp = opts.Comparer
     155            1 :         vs.dynamicBaseLevel = true
     156            1 :         vs.versions.Init(mu)
     157            1 :         vs.obsoleteFn = vs.addObsoleteLocked
     158            1 :         vs.zombieTables = make(map[base.DiskFileNum]objectInfo)
     159            1 :         vs.virtualBackings = manifest.MakeVirtualBackings()
     160            1 :         vs.nextFileNum.Store(1)
     161            1 :         vs.manifestMarker = marker
     162            1 :         vs.getFormatMajorVersion = getFMV
     163            1 : }
     164              : 
     165              : // create creates a version set for a fresh DB.
     166              : func (vs *versionSet) create(
     167              :         jobID JobID,
     168              :         dirname string,
     169              :         provider objstorage.Provider,
     170              :         opts *Options,
     171              :         marker *atomicfs.Marker,
     172              :         getFormatMajorVersion func() FormatMajorVersion,
     173              :         mu *sync.Mutex,
     174            1 : ) error {
     175            1 :         vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
     176            1 :         var bve bulkVersionEdit
     177            1 :         newVersion, err := bve.Apply(nil /* curr */, opts.Comparer, opts.FlushSplitBytes, opts.Experimental.ReadCompactionRate)
     178            1 :         if err != nil {
     179            0 :                 return err
     180            0 :         }
     181            1 :         vs.append(newVersion)
     182            1 : 
     183            1 :         vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil)
     184            1 :         // Note that a "snapshot" version edit is written to the manifest when it is
     185            1 :         // created.
     186            1 :         vs.manifestFileNum = vs.getNextDiskFileNum()
     187            1 :         err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
     188            1 :         if err == nil {
     189            1 :                 if err = vs.manifest.Flush(); err != nil {
     190            0 :                         vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
     191            0 :                 }
     192              :         }
     193            1 :         if err == nil {
     194            1 :                 if err = vs.manifestFile.Sync(); err != nil {
     195            0 :                         vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
     196            0 :                 }
     197              :         }
     198            1 :         if err == nil {
     199            1 :                 // NB: Move() is responsible for syncing the data directory.
     200            1 :                 if err = vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, vs.manifestFileNum)); err != nil {
     201            0 :                         vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
     202            0 :                 }
     203              :         }
     204              : 
     205            1 :         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     206            1 :                 JobID:   int(jobID),
     207            1 :                 Path:    base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
     208            1 :                 FileNum: vs.manifestFileNum,
     209            1 :                 Err:     err,
     210            1 :         })
     211            1 :         if err != nil {
     212            0 :                 return err
     213            0 :         }
     214            1 :         return nil
     215              : }
     216              : 
     217              : // load loads the version set from the manifest file.
     218              : func (vs *versionSet) load(
     219              :         dirname string,
     220              :         provider objstorage.Provider,
     221              :         opts *Options,
     222              :         manifestFileNum base.DiskFileNum,
     223              :         marker *atomicfs.Marker,
     224              :         getFormatMajorVersion func() FormatMajorVersion,
     225              :         mu *sync.Mutex,
     226            1 : ) error {
     227            1 :         vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
     228            1 : 
     229            1 :         vs.manifestFileNum = manifestFileNum
     230            1 :         manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, vs.manifestFileNum)
     231            1 :         manifestFilename := opts.FS.PathBase(manifestPath)
     232            1 : 
     233            1 :         // Read the versionEdits in the manifest file.
     234            1 :         var bve bulkVersionEdit
     235            1 :         bve.AddedTablesByFileNum = make(map[base.FileNum]*tableMetadata)
     236            1 :         manifest, err := vs.fs.Open(manifestPath)
     237            1 :         if err != nil {
     238            0 :                 return errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
     239            0 :                         errors.Safe(manifestFilename), dirname)
     240            0 :         }
     241            1 :         defer manifest.Close()
     242            1 :         rr := record.NewReader(manifest, 0 /* logNum */)
     243            1 :         for {
     244            1 :                 r, err := rr.Next()
     245            1 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     246            1 :                         break
     247              :                 }
     248            1 :                 if err != nil {
     249            0 :                         return errors.Wrapf(err, "pebble: error when loading manifest file %q",
     250            0 :                                 errors.Safe(manifestFilename))
     251            0 :                 }
     252            1 :                 var ve versionEdit
     253            1 :                 err = ve.Decode(r)
     254            1 :                 if err != nil {
     255            0 :                         // Break instead of returning an error if the record is corrupted
     256            0 :                         // or invalid.
     257            0 :                         if err == io.EOF || record.IsInvalidRecord(err) {
     258            0 :                                 break
     259              :                         }
     260            0 :                         return err
     261              :                 }
     262            1 :                 if ve.ComparerName != "" {
     263            1 :                         if ve.ComparerName != vs.cmp.Name {
     264            0 :                                 return errors.Errorf("pebble: manifest file %q for DB %q: "+
     265            0 :                                         "comparer name from file %q != comparer name from Options %q",
     266            0 :                                         errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(vs.cmp.Name))
     267            0 :                         }
     268              :                 }
     269            1 :                 if err := bve.Accumulate(&ve); err != nil {
     270            0 :                         return err
     271            0 :                 }
     272            1 :                 if ve.MinUnflushedLogNum != 0 {
     273            1 :                         vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     274            1 :                 }
     275            1 :                 if ve.NextFileNum != 0 {
     276            1 :                         vs.nextFileNum.Store(ve.NextFileNum)
     277            1 :                 }
     278            1 :                 if ve.LastSeqNum != 0 {
     279            1 :                         // logSeqNum is the _next_ sequence number that will be assigned,
     280            1 :                         // while LastSeqNum is the last assigned sequence number. Note that
     281            1 :                         // this behaviour mimics that in RocksDB; the first sequence number
     282            1 :                         // assigned is one greater than the one present in the manifest
     283            1 :                         // (assuming no WALs contain higher sequence numbers than the
     284            1 :                         // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
     285            1 :                         // next sequence number that will be assigned.
     286            1 :                         //
     287            1 :                         // If LastSeqNum is less than SeqNumStart, increase it to at least
     288            1 :                         // SeqNumStart to leave ample room for reserved sequence numbers.
     289            1 :                         if ve.LastSeqNum+1 < base.SeqNumStart {
     290            0 :                                 vs.logSeqNum.Store(base.SeqNumStart)
     291            1 :                         } else {
     292            1 :                                 vs.logSeqNum.Store(ve.LastSeqNum + 1)
     293            1 :                         }
     294              :                 }
     295              :         }
     296              :         // We have already set vs.nextFileNum = 2 at the beginning of the
     297              :         // function and could have only updated it to some other non-zero value,
     298              :         // so it cannot be 0 here.
     299            1 :         if vs.minUnflushedLogNum == 0 {
     300            1 :                 if vs.nextFileNum.Load() >= 2 {
     301            1 :                         // We either have a freshly created DB, or a DB created by RocksDB
     302            1 :                         // that has not had a single flushed SSTable yet. This is because
     303            1 :                         // RocksDB bumps up nextFileNum in this case without bumping up
     304            1 :                         // minUnflushedLogNum, even if WALs with non-zero file numbers are
     305            1 :                         // present in the directory.
     306            1 :                 } else {
     307            0 :                         return base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
     308            0 :                                 errors.Safe(manifestFilename), dirname)
     309            0 :                 }
     310              :         }
     311            1 :         vs.markFileNumUsed(vs.minUnflushedLogNum)
     312            1 : 
     313            1 :         // Populate the fileBackingMap and the FileBacking for virtual sstables since
     314            1 :         // we have finished version edit accumulation.
     315            1 :         for _, b := range bve.AddedFileBacking {
     316            1 :                 vs.virtualBackings.AddAndRef(b)
     317            1 :         }
     318              : 
     319            1 :         for _, addedLevel := range bve.AddedTables {
     320            1 :                 for _, m := range addedLevel {
     321            1 :                         if m.Virtual {
     322            1 :                                 vs.virtualBackings.AddTable(m)
     323            1 :                         }
     324              :                 }
     325              :         }
     326              : 
     327            1 :         if invariants.Enabled {
     328            1 :                 // There should be no deleted tables or backings, since we're starting from
     329            1 :                 // an empty state.
     330            1 :                 for _, deletedLevel := range bve.DeletedTables {
     331            1 :                         if len(deletedLevel) != 0 {
     332            0 :                                 panic("deleted files after manifest replay")
     333              :                         }
     334              :                 }
     335            1 :                 if len(bve.RemovedFileBacking) > 0 {
     336            0 :                         panic("deleted backings after manifest replay")
     337              :                 }
     338              :         }
     339              : 
     340            1 :         newVersion, err := bve.Apply(nil, opts.Comparer, opts.FlushSplitBytes, opts.Experimental.ReadCompactionRate)
     341            1 :         if err != nil {
     342            0 :                 return err
     343            0 :         }
     344            1 :         newVersion.L0Sublevels.InitCompactingFileInfo(nil /* in-progress compactions */)
     345            1 :         vs.append(newVersion)
     346            1 : 
     347            1 :         for i := range vs.metrics.Levels {
     348            1 :                 l := &vs.metrics.Levels[i]
     349            1 :                 l.NumFiles = int64(newVersion.Levels[i].Len())
     350            1 :                 files := newVersion.Levels[i].Slice()
     351            1 :                 l.Size = int64(files.SizeSum())
     352            1 :         }
     353            1 :         for _, l := range newVersion.Levels {
     354            1 :                 iter := l.Iter()
     355            1 :                 for f := iter.First(); f != nil; f = iter.Next() {
     356            1 :                         if !f.Virtual {
     357            1 :                                 _, localSize := sizeIfLocal(f.FileBacking, vs.provider)
     358            1 :                                 vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
     359            1 :                         }
     360              :                 }
     361              :         }
     362            1 :         vs.virtualBackings.ForEach(func(backing *fileBacking) {
     363            1 :                 _, localSize := sizeIfLocal(backing, vs.provider)
     364            1 :                 vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
     365            1 :         })
     366              : 
     367            1 :         vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, nil)
     368            1 :         return nil
     369              : }
     370              : 
     371            1 : func (vs *versionSet) close() error {
     372            1 :         if vs.manifestFile != nil {
     373            1 :                 if err := vs.manifestFile.Close(); err != nil {
     374            0 :                         return err
     375            0 :                 }
     376              :         }
     377            1 :         if vs.manifestMarker != nil {
     378            1 :                 if err := vs.manifestMarker.Close(); err != nil {
     379            0 :                         return err
     380            0 :                 }
     381              :         }
     382            1 :         return nil
     383              : }
     384              : 
     385              : // logLock locks the manifest for writing. The lock must be released by either
     386              : // a call to logUnlock or logAndApply.
     387              : //
     388              : // DB.mu must be held when calling this method, but the mutex may be dropped and
     389              : // re-acquired during the course of this method.
     390            1 : func (vs *versionSet) logLock() {
     391            1 :         // Wait for any existing writing to the manifest to complete, then mark the
     392            1 :         // manifest as busy.
     393            1 :         for vs.writing {
     394            1 :                 // Note: writerCond.L is DB.mu, so we unlock it while we wait.
     395            1 :                 vs.writerCond.Wait()
     396            1 :         }
     397            1 :         vs.writing = true
     398              : }
     399              : 
     400              : // logUnlock releases the lock for manifest writing.
     401              : //
     402              : // DB.mu must be held when calling this method.
     403            1 : func (vs *versionSet) logUnlock() {
     404            1 :         if !vs.writing {
     405            0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     406            0 :         }
     407            1 :         vs.writing = false
     408            1 :         vs.writerCond.Signal()
     409              : }
     410              : 
     411              : // logAndApply logs the version edit to the manifest, applies the version edit
     412              : // to the current version, and installs the new version.
     413              : //
     414              : // logAndApply fills in the following fields of the VersionEdit: NextFileNum,
     415              : // LastSeqNum, RemovedBackingTables. The removed backing tables are those
     416              : // backings that are no longer used (in the new version) after applying the edit
     417              : // (as per vs.virtualBackings). Other than these fields, the VersionEdit must be
     418              : // complete.
     419              : //
     420              : // New table backing references (FileBacking.Ref) are taken as part of applying
     421              : // the version edit. The state of the virtual backings (vs.virtualBackings) is
     422              : // updated before logging to the manifest and installing the latest version;
     423              : // this is ok because any failure in those steps is fatal.
     424              : // TODO(radu): remove the error return.
     425              : //
     426              : // DB.mu must be held when calling this method and will be released temporarily
     427              : // while performing file I/O. Requires that the manifest is locked for writing
     428              : // (see logLock). Will unconditionally release the manifest lock (via
     429              : // logUnlock) even if an error occurs.
     430              : //
     431              : // inProgressCompactions is called while DB.mu is held, to get the list of
     432              : // in-progress compactions.
     433              : func (vs *versionSet) logAndApply(
     434              :         jobID JobID,
     435              :         ve *versionEdit,
     436              :         metrics map[int]*LevelMetrics,
     437              :         forceRotation bool,
     438              :         inProgressCompactions func() []compactionInfo,
     439            1 : ) error {
     440            1 :         if !vs.writing {
     441            0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     442            0 :         }
     443            1 :         defer vs.logUnlock()
     444            1 : 
     445            1 :         if ve.MinUnflushedLogNum != 0 {
     446            1 :                 if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
     447            1 :                         vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
     448            0 :                         panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
     449            0 :                                 ve.MinUnflushedLogNum))
     450              :                 }
     451              :         }
     452              : 
     453              :         // This is the next manifest filenum, but if the current file is too big we
     454              :         // will write this ve to the next file which means what ve encodes is the
     455              :         // current filenum and not the next one.
     456              :         //
     457              :         // TODO(sbhola): figure out why this is correct and update comment.
     458            1 :         ve.NextFileNum = vs.nextFileNum.Load()
     459            1 : 
     460            1 :         // LastSeqNum is set to the current upper bound on the assigned sequence
     461            1 :         // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
     462            1 :         // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
     463            1 :         // replay. It must be higher than or equal to any than any sequence number
     464            1 :         // written to an sstable, including sequence numbers in ingested files.
     465            1 :         // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
     466            1 :         // number. This is fallout from ingestion which allows a sequence number X to
     467            1 :         // be assigned to an ingested sstable even though sequence number X-1 resides
     468            1 :         // in an unflushed memtable. logSeqNum is the _next_ sequence number that
     469            1 :         // will be assigned, so subtract that by 1 to get the upper bound on the
     470            1 :         // last assigned sequence number.
     471            1 :         logSeqNum := vs.logSeqNum.Load()
     472            1 :         ve.LastSeqNum = logSeqNum - 1
     473            1 :         if logSeqNum == 0 {
     474            0 :                 // logSeqNum is initialized to 1 in Open() if there are no previous WAL
     475            0 :                 // or manifest records, so this case should never happen.
     476            0 :                 vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
     477            0 :         }
     478              : 
     479            1 :         currentVersion := vs.currentVersion()
     480            1 :         var newVersion *version
     481            1 : 
     482            1 :         // Generate a new manifest if we don't currently have one, or forceRotation
     483            1 :         // is true, or the current one is too large.
     484            1 :         //
     485            1 :         // For largeness, we do not exclusively use MaxManifestFileSize size
     486            1 :         // threshold since we have had incidents where due to either large keys or
     487            1 :         // large numbers of files, each edit results in a snapshot + write of the
     488            1 :         // edit. This slows the system down since each flush or compaction is
     489            1 :         // writing a new manifest snapshot. The primary goal of the size-based
     490            1 :         // rollover logic is to ensure that when reopening a DB, the number of edits
     491            1 :         // that need to be replayed on top of the snapshot is "sane". Rolling over
     492            1 :         // to a new manifest after each edit is not relevant to that goal.
     493            1 :         //
     494            1 :         // Consider the following cases:
     495            1 :         // - The number of live files F in the DB is roughly stable: after writing
     496            1 :         //   the snapshot (with F files), say we require that there be enough edits
     497            1 :         //   such that the cumulative number of files in those edits, E, be greater
     498            1 :         //   than F. This will ensure that the total amount of time in logAndApply
     499            1 :         //   that is spent in snapshot writing is ~50%.
     500            1 :         //
     501            1 :         // - The number of live files F in the DB is shrinking drastically, say from
     502            1 :         //   F to F/10: This can happen for various reasons, like wide range
     503            1 :         //   tombstones, or large numbers of smaller than usual files that are being
     504            1 :         //   merged together into larger files. And say the new files generated
     505            1 :         //   during this shrinkage is insignificant compared to F/10, and so for
     506            1 :         //   this example we will assume it is effectively 0. After this shrinking,
     507            1 :         //   E = 0.9F, and so if we used the previous snapshot file count, F, as the
     508            1 :         //   threshold that needs to be exceeded, we will further delay the snapshot
     509            1 :         //   writing. Which means on DB reopen we will need to replay 0.9F edits to
     510            1 :         //   get to a version with 0.1F files. It would be better to create a new
     511            1 :         //   snapshot when E exceeds the number of files in the current version.
     512            1 :         //
     513            1 :         // - The number of live files F in the DB is growing via perfect ingests
     514            1 :         //   into L6: Say we wrote the snapshot when there were F files and now we
     515            1 :         //   have 10F files, so E = 9F. We will further delay writing a new
     516            1 :         //   snapshot. This case can be critiqued as contrived, but we consider it
     517            1 :         //   nonetheless.
     518            1 :         //
     519            1 :         // The logic below uses the min of the last snapshot file count and the file
     520            1 :         // count in the current version.
     521            1 :         vs.rotationHelper.AddRecord(int64(len(ve.DeletedTables) + len(ve.NewTables)))
     522            1 :         sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
     523            1 :         requireRotation := forceRotation || vs.manifest == nil
     524            1 : 
     525            1 :         var nextSnapshotFilecount int64
     526            1 :         for i := range vs.metrics.Levels {
     527            1 :                 nextSnapshotFilecount += vs.metrics.Levels[i].NumFiles
     528            1 :         }
     529            1 :         if sizeExceeded && !requireRotation {
     530            1 :                 requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
     531            1 :         }
     532            1 :         var newManifestFileNum base.DiskFileNum
     533            1 :         var prevManifestFileSize uint64
     534            1 :         var newManifestVirtualBackings []*fileBacking
     535            1 :         if requireRotation {
     536            1 :                 newManifestFileNum = vs.getNextDiskFileNum()
     537            1 :                 prevManifestFileSize = uint64(vs.manifest.Size())
     538            1 : 
     539            1 :                 // We want the virtual backings *before* applying the version edit, because
     540            1 :                 // the new manifest will contain the pre-apply version plus the last version
     541            1 :                 // edit.
     542            1 :                 newManifestVirtualBackings = vs.virtualBackings.Backings()
     543            1 :         }
     544              : 
     545              :         // Grab certain values before releasing vs.mu, in case createManifest() needs
     546              :         // to be called.
     547            1 :         minUnflushedLogNum := vs.minUnflushedLogNum
     548            1 :         nextFileNum := vs.nextFileNum.Load()
     549            1 : 
     550            1 :         // Note: this call populates ve.RemovedBackingTables.
     551            1 :         zombieBackings, removedVirtualBackings, localLiveSizeDelta :=
     552            1 :                 getZombiesAndUpdateVirtualBackings(ve, &vs.virtualBackings, vs.provider)
     553            1 : 
     554            1 :         if err := func() error {
     555            1 :                 vs.mu.Unlock()
     556            1 :                 defer vs.mu.Lock()
     557            1 : 
     558            1 :                 if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
     559            0 :                         return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
     560            0 :                 }
     561            1 :                 var b bulkVersionEdit
     562            1 :                 err := b.Accumulate(ve)
     563            1 :                 if err != nil {
     564            0 :                         return errors.Wrap(err, "MANIFEST accumulate failed")
     565            0 :                 }
     566            1 :                 newVersion, err = b.Apply(
     567            1 :                         currentVersion, vs.cmp, vs.opts.FlushSplitBytes, vs.opts.Experimental.ReadCompactionRate,
     568            1 :                 )
     569            1 :                 if err != nil {
     570            0 :                         return errors.Wrap(err, "MANIFEST apply failed")
     571            0 :                 }
     572              : 
     573            1 :                 if newManifestFileNum != 0 {
     574            1 :                         if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
     575            0 :                                 vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     576            0 :                                         JobID:   int(jobID),
     577            0 :                                         Path:    base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
     578            0 :                                         FileNum: newManifestFileNum,
     579            0 :                                         Err:     err,
     580            0 :                                 })
     581            0 :                                 return errors.Wrap(err, "MANIFEST create failed")
     582            0 :                         }
     583              :                 }
     584              : 
     585            1 :                 w, err := vs.manifest.Next()
     586            1 :                 if err != nil {
     587            0 :                         return errors.Wrap(err, "MANIFEST next record write failed")
     588            0 :                 }
     589              : 
     590              :                 // NB: Any error from this point on is considered fatal as we don't know if
     591              :                 // the MANIFEST write occurred or not. Trying to determine that is
     592              :                 // fraught. Instead we rely on the standard recovery mechanism run when a
     593              :                 // database is open. In particular, that mechanism generates a new MANIFEST
     594              :                 // and ensures it is synced.
     595            1 :                 if err := ve.Encode(w); err != nil {
     596            0 :                         return errors.Wrap(err, "MANIFEST write failed")
     597            0 :                 }
     598            1 :                 if err := vs.manifest.Flush(); err != nil {
     599            0 :                         return errors.Wrap(err, "MANIFEST flush failed")
     600            0 :                 }
     601            1 :                 if err := vs.manifestFile.Sync(); err != nil {
     602            0 :                         return errors.Wrap(err, "MANIFEST sync failed")
     603            0 :                 }
     604            1 :                 if newManifestFileNum != 0 {
     605            1 :                         // NB: Move() is responsible for syncing the data directory.
     606            1 :                         if err := vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, newManifestFileNum)); err != nil {
     607            0 :                                 return errors.Wrap(err, "MANIFEST set current failed")
     608            0 :                         }
     609            1 :                         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     610            1 :                                 JobID:   int(jobID),
     611            1 :                                 Path:    base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
     612            1 :                                 FileNum: newManifestFileNum,
     613            1 :                         })
     614              :                 }
     615            1 :                 return nil
     616            0 :         }(); err != nil {
     617            0 :                 // Any error encountered during any of the operations in the previous
     618            0 :                 // closure are considered fatal. Treating such errors as fatal is preferred
     619            0 :                 // to attempting to unwind various file and b-tree reference counts, and
     620            0 :                 // re-generating L0 sublevel metadata. This may change in the future, if
     621            0 :                 // certain manifest / WAL operations become retryable. For more context, see
     622            0 :                 // #1159 and #1792.
     623            0 :                 vs.opts.Logger.Fatalf("%s", err)
     624            0 :                 return err
     625            0 :         }
     626              : 
     627            1 :         if requireRotation {
     628            1 :                 // Successfully rotated.
     629            1 :                 vs.rotationHelper.Rotate(nextSnapshotFilecount)
     630            1 :         }
     631              :         // Now that DB.mu is held again, initialize compacting file info in
     632              :         // L0Sublevels.
     633            1 :         inProgress := inProgressCompactions()
     634            1 : 
     635            1 :         newVersion.L0Sublevels.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
     636            1 : 
     637            1 :         // Update the zombie tables set first, as installation of the new version
     638            1 :         // will unref the previous version which could result in addObsoleteLocked
     639            1 :         // being called.
     640            1 :         for _, b := range zombieBackings {
     641            1 :                 vs.zombieTables[b.backing.DiskFileNum] = objectInfo{
     642            1 :                         fileInfo: fileInfo{
     643            1 :                                 FileNum:  b.backing.DiskFileNum,
     644            1 :                                 FileSize: b.backing.Size,
     645            1 :                         },
     646            1 :                         isLocal: b.isLocal,
     647            1 :                 }
     648            1 :         }
     649              : 
     650              :         // Unref the removed backings and report those that already became obsolete.
     651              :         // Note that the only case where we report obsolete tables here is when
     652              :         // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
     653              :         // it being used in the current version.
     654            1 :         var obsoleteVirtualBackings manifest.ObsoleteFiles
     655            1 :         for _, b := range removedVirtualBackings {
     656            1 :                 if b.backing.Unref() == 0 {
     657            1 :                         obsoleteVirtualBackings.FileBackings = append(obsoleteVirtualBackings.FileBackings, b.backing)
     658            1 :                 }
     659              :         }
     660            1 :         vs.addObsoleteLocked(obsoleteVirtualBackings)
     661            1 : 
     662            1 :         // Install the new version.
     663            1 :         vs.append(newVersion)
     664            1 : 
     665            1 :         if ve.MinUnflushedLogNum != 0 {
     666            1 :                 vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     667            1 :         }
     668            1 :         if newManifestFileNum != 0 {
     669            1 :                 if vs.manifestFileNum != 0 {
     670            1 :                         vs.obsoleteManifests = append(vs.obsoleteManifests, fileInfo{
     671            1 :                                 FileNum:  vs.manifestFileNum,
     672            1 :                                 FileSize: prevManifestFileSize,
     673            1 :                         })
     674            1 :                 }
     675            1 :                 vs.manifestFileNum = newManifestFileNum
     676              :         }
     677              : 
     678            1 :         for level, update := range metrics {
     679            1 :                 vs.metrics.Levels[level].Add(update)
     680            1 :         }
     681            1 :         for i := range vs.metrics.Levels {
     682            1 :                 l := &vs.metrics.Levels[i]
     683            1 :                 l.NumFiles = int64(newVersion.Levels[i].Len())
     684            1 :                 l.NumVirtualFiles = newVersion.Levels[i].NumVirtual
     685            1 :                 l.VirtualSize = newVersion.Levels[i].VirtualSize
     686            1 :                 l.Size = int64(newVersion.Levels[i].Size())
     687            1 : 
     688            1 :                 l.Sublevels = 0
     689            1 :                 if l.NumFiles > 0 {
     690            1 :                         l.Sublevels = 1
     691            1 :                 }
     692            1 :                 if invariants.Enabled {
     693            1 :                         levelFiles := newVersion.Levels[i].Slice()
     694            1 :                         if size := int64(levelFiles.SizeSum()); l.Size != size {
     695            0 :                                 vs.opts.Logger.Fatalf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Size, size)
     696            0 :                         }
     697            1 :                         if nVirtual := levelFiles.NumVirtual(); nVirtual != l.NumVirtualFiles {
     698            0 :                                 vs.opts.Logger.Fatalf(
     699            0 :                                         "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
     700            0 :                                         i, l.NumVirtualFiles, nVirtual,
     701            0 :                                 )
     702            0 :                         }
     703            1 :                         if vSize := levelFiles.VirtualSizeSum(); vSize != l.VirtualSize {
     704            0 :                                 vs.opts.Logger.Fatalf(
     705            0 :                                         "versionSet metrics L%d Virtual size = %d, actual size = %d",
     706            0 :                                         i, l.VirtualSize, vSize,
     707            0 :                                 )
     708            0 :                         }
     709              :                 }
     710              :         }
     711            1 :         vs.metrics.Levels[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
     712            1 :         vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localLiveSizeDelta)
     713            1 : 
     714            1 :         vs.picker = newCompactionPickerByScore(newVersion, &vs.virtualBackings, vs.opts, inProgress)
     715            1 :         if !vs.dynamicBaseLevel {
     716            0 :                 vs.picker.forceBaseLevel1()
     717            0 :         }
     718            1 :         return nil
     719              : }
     720              : 
     721              : type fileBackingInfo struct {
     722              :         backing *fileBacking
     723              :         isLocal bool
     724              : }
     725              : 
     726              : // getZombiesAndUpdateVirtualBackings updates the virtual backings with the
     727              : // changes in the versionEdit and populates ve.RemovedBackingTables.
     728              : // Returns:
     729              : //   - zombieBackings: all backings (physical and virtual) that will no longer be
     730              : //     needed when we apply ve.
     731              : //   - removedVirtualBackings: the virtual backings that will be removed by the
     732              : //     VersionEdit and which must be Unref()ed by the caller. These backings
     733              : //     match ve.RemovedBackingTables.
     734              : //   - localLiveSizeDelta: the delta in local live bytes.
     735              : func getZombiesAndUpdateVirtualBackings(
     736              :         ve *versionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
     737            1 : ) (zombieBackings, removedVirtualBackings []fileBackingInfo, localLiveSizeDelta int64) {
     738            1 :         // First, deal with the physical tables.
     739            1 :         //
     740            1 :         // A physical backing has become unused if it is in DeletedFiles but not in
     741            1 :         // NewFiles or CreatedBackingTables.
     742            1 :         //
     743            1 :         // Note that for the common case where there are very few elements, the map
     744            1 :         // will stay on the stack.
     745            1 :         stillUsed := make(map[base.DiskFileNum]struct{})
     746            1 :         for _, nf := range ve.NewTables {
     747            1 :                 if !nf.Meta.Virtual {
     748            1 :                         stillUsed[nf.Meta.FileBacking.DiskFileNum] = struct{}{}
     749            1 :                         _, localFileDelta := sizeIfLocal(nf.Meta.FileBacking, provider)
     750            1 :                         localLiveSizeDelta += localFileDelta
     751            1 :                 }
     752              :         }
     753            1 :         for _, b := range ve.CreatedBackingTables {
     754            1 :                 stillUsed[b.DiskFileNum] = struct{}{}
     755            1 :         }
     756            1 :         for _, m := range ve.DeletedTables {
     757            1 :                 if !m.Virtual {
     758            1 :                         // NB: this deleted file may also be in NewFiles or
     759            1 :                         // CreatedBackingTables, due to a file moving between levels, or
     760            1 :                         // becoming virtualized. In which case there is no change due to this
     761            1 :                         // file in the localLiveSizeDelta -- the subtraction below compensates
     762            1 :                         // for the addition.
     763            1 :                         isLocal, localFileDelta := sizeIfLocal(m.FileBacking, provider)
     764            1 :                         localLiveSizeDelta -= localFileDelta
     765            1 :                         if _, ok := stillUsed[m.FileBacking.DiskFileNum]; !ok {
     766            1 :                                 zombieBackings = append(zombieBackings, fileBackingInfo{
     767            1 :                                         backing: m.FileBacking,
     768            1 :                                         isLocal: isLocal,
     769            1 :                                 })
     770            1 :                         }
     771              :                 }
     772              :         }
     773              : 
     774              :         // Now deal with virtual tables.
     775              :         //
     776              :         // When a virtual table moves between levels we AddTable() then RemoveTable(),
     777              :         // which works out.
     778            1 :         for _, b := range ve.CreatedBackingTables {
     779            1 :                 virtualBackings.AddAndRef(b)
     780            1 :                 _, localFileDelta := sizeIfLocal(b, provider)
     781            1 :                 localLiveSizeDelta += localFileDelta
     782            1 :         }
     783            1 :         for _, nf := range ve.NewTables {
     784            1 :                 if nf.Meta.Virtual {
     785            1 :                         virtualBackings.AddTable(nf.Meta)
     786            1 :                 }
     787              :         }
     788            1 :         for _, m := range ve.DeletedTables {
     789            1 :                 if m.Virtual {
     790            1 :                         virtualBackings.RemoveTable(m)
     791            1 :                 }
     792              :         }
     793              : 
     794            1 :         if unused := virtualBackings.Unused(); len(unused) > 0 {
     795            1 :                 // Virtual backings that are no longer used are zombies and are also added
     796            1 :                 // to RemovedBackingTables (before the version edit is written to disk).
     797            1 :                 ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
     798            1 :                 for i, b := range unused {
     799            1 :                         isLocal, localFileDelta := sizeIfLocal(b, provider)
     800            1 :                         localLiveSizeDelta -= localFileDelta
     801            1 :                         ve.RemovedBackingTables[i] = b.DiskFileNum
     802            1 :                         zombieBackings = append(zombieBackings, fileBackingInfo{
     803            1 :                                 backing: b,
     804            1 :                                 isLocal: isLocal,
     805            1 :                         })
     806            1 :                         virtualBackings.Remove(b.DiskFileNum)
     807            1 :                 }
     808            1 :                 removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
     809              :         }
     810            1 :         return zombieBackings, removedVirtualBackings, localLiveSizeDelta
     811              : }
     812              : 
     813              : // sizeIfLocal returns backing.Size if the backing is a local file, else 0.
     814              : func sizeIfLocal(
     815              :         backing *fileBacking, provider objstorage.Provider,
     816            1 : ) (isLocal bool, localSize int64) {
     817            1 :         isLocal = objstorage.IsLocalTable(provider, backing.DiskFileNum)
     818            1 :         if isLocal {
     819            1 :                 return true, int64(backing.Size)
     820            1 :         }
     821            1 :         return false, 0
     822              : }
     823              : 
     824              : func (vs *versionSet) incrementCompactions(
     825              :         kind compactionKind, extraLevels []*compactionLevel, pickerMetrics compactionPickerMetrics,
     826            1 : ) {
     827            1 :         switch kind {
     828            1 :         case compactionKindDefault:
     829            1 :                 vs.metrics.Compact.Count++
     830            1 :                 vs.metrics.Compact.DefaultCount++
     831              : 
     832            1 :         case compactionKindFlush, compactionKindIngestedFlushable:
     833            1 :                 vs.metrics.Flush.Count++
     834              : 
     835            1 :         case compactionKindMove:
     836            1 :                 vs.metrics.Compact.Count++
     837            1 :                 vs.metrics.Compact.MoveCount++
     838              : 
     839            1 :         case compactionKindDeleteOnly:
     840            1 :                 vs.metrics.Compact.Count++
     841            1 :                 vs.metrics.Compact.DeleteOnlyCount++
     842              : 
     843            1 :         case compactionKindElisionOnly:
     844            1 :                 vs.metrics.Compact.Count++
     845            1 :                 vs.metrics.Compact.ElisionOnlyCount++
     846              : 
     847            0 :         case compactionKindRead:
     848            0 :                 vs.metrics.Compact.Count++
     849            0 :                 vs.metrics.Compact.ReadCount++
     850              : 
     851            1 :         case compactionKindTombstoneDensity:
     852            1 :                 vs.metrics.Compact.Count++
     853            1 :                 vs.metrics.Compact.TombstoneDensityCount++
     854              : 
     855            1 :         case compactionKindRewrite:
     856            1 :                 vs.metrics.Compact.Count++
     857            1 :                 vs.metrics.Compact.RewriteCount++
     858              : 
     859            1 :         case compactionKindCopy:
     860            1 :                 vs.metrics.Compact.Count++
     861            1 :                 vs.metrics.Compact.CopyCount++
     862              : 
     863            0 :         default:
     864            0 :                 if invariants.Enabled {
     865            0 :                         panic("unhandled compaction kind")
     866              :                 }
     867              :         }
     868            1 :         if len(extraLevels) > 0 {
     869            1 :                 vs.metrics.Compact.MultiLevelCount++
     870            1 :         }
     871              : }
     872              : 
     873            1 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
     874            1 :         vs.atomicInProgressBytes.Add(numBytes)
     875            1 : }
     876              : 
     877              : // createManifest creates a manifest file that contains a snapshot of vs.
     878              : func (vs *versionSet) createManifest(
     879              :         dirname string,
     880              :         fileNum, minUnflushedLogNum base.DiskFileNum,
     881              :         nextFileNum uint64,
     882              :         virtualBackings []*fileBacking,
     883            1 : ) (err error) {
     884            1 :         var (
     885            1 :                 filename     = base.MakeFilepath(vs.fs, dirname, base.FileTypeManifest, fileNum)
     886            1 :                 manifestFile vfs.File
     887            1 :                 manifest     *record.Writer
     888            1 :         )
     889            1 :         defer func() {
     890            1 :                 if manifest != nil {
     891            0 :                         manifest.Close()
     892            0 :                 }
     893            1 :                 if manifestFile != nil {
     894            0 :                         manifestFile.Close()
     895            0 :                 }
     896            1 :                 if err != nil {
     897            0 :                         vs.fs.Remove(filename)
     898            0 :                 }
     899              :         }()
     900            1 :         manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
     901            1 :         if err != nil {
     902            0 :                 return err
     903            0 :         }
     904            1 :         manifest = record.NewWriter(manifestFile)
     905            1 : 
     906            1 :         snapshot := versionEdit{
     907            1 :                 ComparerName: vs.cmp.Name,
     908            1 :         }
     909            1 : 
     910            1 :         for level, levelMetadata := range vs.currentVersion().Levels {
     911            1 :                 iter := levelMetadata.Iter()
     912            1 :                 for meta := iter.First(); meta != nil; meta = iter.Next() {
     913            1 :                         snapshot.NewTables = append(snapshot.NewTables, newTableEntry{
     914            1 :                                 Level: level,
     915            1 :                                 Meta:  meta,
     916            1 :                         })
     917            1 :                 }
     918              :         }
     919              : 
     920            1 :         snapshot.CreatedBackingTables = virtualBackings
     921            1 : 
     922            1 :         // When creating a version snapshot for an existing DB, this snapshot VersionEdit will be
     923            1 :         // immediately followed by another VersionEdit (being written in logAndApply()). That
     924            1 :         // VersionEdit always contains a LastSeqNum, so we don't need to include that in the snapshot.
     925            1 :         // But it does not necessarily include MinUnflushedLogNum, NextFileNum, so we initialize those
     926            1 :         // using the corresponding fields in the versionSet (which came from the latest preceding
     927            1 :         // VersionEdit that had those fields).
     928            1 :         snapshot.MinUnflushedLogNum = minUnflushedLogNum
     929            1 :         snapshot.NextFileNum = nextFileNum
     930            1 : 
     931            1 :         w, err1 := manifest.Next()
     932            1 :         if err1 != nil {
     933            0 :                 return err1
     934            0 :         }
     935            1 :         if err := snapshot.Encode(w); err != nil {
     936            0 :                 return err
     937            0 :         }
     938              : 
     939            1 :         if vs.manifest != nil {
     940            1 :                 vs.manifest.Close()
     941            1 :                 vs.manifest = nil
     942            1 :         }
     943            1 :         if vs.manifestFile != nil {
     944            1 :                 if err := vs.manifestFile.Close(); err != nil {
     945            0 :                         return err
     946            0 :                 }
     947            1 :                 vs.manifestFile = nil
     948              :         }
     949              : 
     950            1 :         vs.manifest, manifest = manifest, nil
     951            1 :         vs.manifestFile, manifestFile = manifestFile, nil
     952            1 :         return nil
     953              : }
     954              : 
     955              : // NB: This method is not safe for concurrent use. It is only safe
     956              : // to be called when concurrent changes to nextFileNum are not expected.
     957            1 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
     958            1 :         if vs.nextFileNum.Load() <= uint64(fileNum) {
     959            1 :                 vs.nextFileNum.Store(uint64(fileNum + 1))
     960            1 :         }
     961              : }
     962              : 
     963              : // getNextFileNum returns the next file number to be used.
     964              : //
     965              : // Can be called without the versionSet's mutex being held.
     966            1 : func (vs *versionSet) getNextFileNum() base.FileNum {
     967            1 :         x := vs.nextFileNum.Add(1) - 1
     968            1 :         return base.FileNum(x)
     969            1 : }
     970              : 
     971              : // Can be called without the versionSet's mutex being held.
     972            1 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
     973            1 :         x := vs.nextFileNum.Add(1) - 1
     974            1 :         return base.DiskFileNum(x)
     975            1 : }
     976              : 
     977            1 : func (vs *versionSet) append(v *version) {
     978            1 :         if v.Refs() != 0 {
     979            0 :                 panic("pebble: version should be unreferenced")
     980              :         }
     981            1 :         if !vs.versions.Empty() {
     982            1 :                 vs.versions.Back().UnrefLocked()
     983            1 :         }
     984            1 :         v.Deleted = vs.obsoleteFn
     985            1 :         v.Ref()
     986            1 :         vs.versions.PushBack(v)
     987            1 :         if invariants.Enabled {
     988            1 :                 // Verify that the virtualBackings contains all the backings referenced by
     989            1 :                 // the version.
     990            1 :                 for _, l := range v.Levels {
     991            1 :                         iter := l.Iter()
     992            1 :                         for f := iter.First(); f != nil; f = iter.Next() {
     993            1 :                                 if f.Virtual {
     994            1 :                                         if _, ok := vs.virtualBackings.Get(f.FileBacking.DiskFileNum); !ok {
     995            0 :                                                 panic(fmt.Sprintf("%s is not in virtualBackings", f.FileBacking.DiskFileNum))
     996              :                                         }
     997              :                                 }
     998              :                         }
     999              :                 }
    1000              :         }
    1001              : }
    1002              : 
    1003            1 : func (vs *versionSet) currentVersion() *version {
    1004            1 :         return vs.versions.Back()
    1005            1 : }
    1006              : 
    1007            1 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
    1008            1 :         current := vs.currentVersion()
    1009            1 :         for v := vs.versions.Front(); true; v = v.Next() {
    1010            1 :                 for _, lm := range v.Levels {
    1011            1 :                         iter := lm.Iter()
    1012            1 :                         for f := iter.First(); f != nil; f = iter.Next() {
    1013            1 :                                 m[f.FileBacking.DiskFileNum] = struct{}{}
    1014            1 :                         }
    1015              :                 }
    1016            1 :                 if v == current {
    1017            1 :                         break
    1018              :                 }
    1019              :         }
    1020              :         // virtualBackings contains backings that are referenced by some virtual
    1021              :         // tables in the latest version (which are handled above), and backings that
    1022              :         // are not but are still alive because of the protection mechanism (see
    1023              :         // manifset.VirtualBackings). This loop ensures the latter get added to the
    1024              :         // map.
    1025            1 :         vs.virtualBackings.ForEach(func(b *fileBacking) {
    1026            1 :                 m[b.DiskFileNum] = struct{}{}
    1027            1 :         })
    1028              : }
    1029              : 
    1030              : // addObsoleteLocked will add the fileInfo associated with obsolete backing
    1031              : // sstables to the obsolete tables list.
    1032              : //
    1033              : // The file backings in the obsolete list must not appear more than once.
    1034              : //
    1035              : // DB.mu must be held when addObsoleteLocked is called.
    1036            1 : func (vs *versionSet) addObsoleteLocked(obsolete manifest.ObsoleteFiles) {
    1037            1 :         if obsolete.Count() == 0 {
    1038            1 :                 return
    1039            1 :         }
    1040              : 
    1041            1 :         obsoleteFileInfo := make([]objectInfo, len(obsolete.FileBackings))
    1042            1 :         for i, bs := range obsolete.FileBackings {
    1043            1 :                 obsoleteFileInfo[i].FileNum = bs.DiskFileNum
    1044            1 :                 obsoleteFileInfo[i].FileSize = bs.Size
    1045            1 :         }
    1046              : 
    1047            1 :         if invariants.Enabled {
    1048            1 :                 dedup := make(map[base.DiskFileNum]struct{})
    1049            1 :                 for _, fi := range obsoleteFileInfo {
    1050            1 :                         dedup[fi.FileNum] = struct{}{}
    1051            1 :                 }
    1052            1 :                 if len(dedup) != len(obsoleteFileInfo) {
    1053            0 :                         panic("pebble: duplicate FileBacking present in obsolete list")
    1054              :                 }
    1055              :         }
    1056              : 
    1057            1 :         for i, fi := range obsoleteFileInfo {
    1058            1 :                 // Note that the obsolete tables are no longer zombie by the definition of
    1059            1 :                 // zombie, but we leave them in the zombie tables map until they are
    1060            1 :                 // deleted from disk.
    1061            1 :                 //
    1062            1 :                 // TODO(sumeer): this means that the zombie metrics, like ZombieSize,
    1063            1 :                 // computed in DB.Metrics are also being counted in the obsolete metrics.
    1064            1 :                 // Was this intentional?
    1065            1 :                 info, ok := vs.zombieTables[fi.FileNum]
    1066            1 :                 if !ok {
    1067            0 :                         vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fi.FileNum)
    1068            0 :                 }
    1069            1 :                 obsoleteFileInfo[i].isLocal = info.isLocal
    1070              :         }
    1071              : 
    1072            1 :         vs.obsoleteTables = append(vs.obsoleteTables, obsoleteFileInfo...)
    1073            1 :         vs.updateObsoleteTableMetricsLocked()
    1074              : }
    1075              : 
    1076              : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
    1077              : // called.
    1078            1 : func (vs *versionSet) addObsolete(obsolete manifest.ObsoleteFiles) {
    1079            1 :         vs.mu.Lock()
    1080            1 :         defer vs.mu.Unlock()
    1081            1 :         vs.addObsoleteLocked(obsolete)
    1082            1 : }
    1083              : 
    1084            1 : func (vs *versionSet) updateObsoleteTableMetricsLocked() {
    1085            1 :         vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables))
    1086            1 :         vs.metrics.Table.ObsoleteSize = 0
    1087            1 :         vs.metrics.Table.Local.ObsoleteSize = 0
    1088            1 :         for _, fi := range vs.obsoleteTables {
    1089            1 :                 vs.metrics.Table.ObsoleteSize += fi.FileSize
    1090            1 :                 if fi.isLocal {
    1091            1 :                         vs.metrics.Table.Local.ObsoleteSize += fi.FileSize
    1092            1 :                 }
    1093              :         }
    1094              : }
    1095              : 
    1096              : func findCurrentManifest(
    1097              :         fs vfs.FS, dirname string, ls []string,
    1098            1 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
    1099            1 :         // Locating a marker should succeed even if the marker has never been placed.
    1100            1 :         var filename string
    1101            1 :         marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
    1102            1 :         if err != nil {
    1103            0 :                 return nil, 0, false, err
    1104            0 :         }
    1105              : 
    1106            1 :         if filename == "" {
    1107            1 :                 // The marker hasn't been set yet. This database doesn't exist.
    1108            1 :                 return marker, 0, false, nil
    1109            1 :         }
    1110              : 
    1111            1 :         var ok bool
    1112            1 :         _, manifestNum, ok = base.ParseFilename(fs, filename)
    1113            1 :         if !ok {
    1114            0 :                 return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
    1115            0 :         }
    1116            1 :         return marker, manifestNum, true, nil
    1117              : }
    1118              : 
    1119            0 : func newFileMetrics(newFiles []manifest.NewTableEntry) map[int]*LevelMetrics {
    1120            0 :         m := map[int]*LevelMetrics{}
    1121            0 :         for _, nf := range newFiles {
    1122            0 :                 lm := m[nf.Level]
    1123            0 :                 if lm == nil {
    1124            0 :                         lm = &LevelMetrics{}
    1125            0 :                         m[nf.Level] = lm
    1126            0 :                 }
    1127            0 :                 lm.NumFiles++
    1128            0 :                 lm.Size += int64(nf.Meta.Size)
    1129              :         }
    1130            0 :         return m
    1131              : }
        

Generated by: LCOV version 2.0-1