LCOV - code coverage report
Current view: top level - pebble - version_set.go (source / functions) Coverage Total Hit
Test: 2025-11-23 08:19Z d7ce913e - meta test only.lcov Lines: 83.2 % 637 530
Test Date: 2025-11-23 08:20:36 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "fmt"
       9              :         "slices"
      10              :         "sync"
      11              :         "sync/atomic"
      12              :         "time"
      13              : 
      14              :         "github.com/cockroachdb/crlib/crtime"
      15              :         "github.com/cockroachdb/errors"
      16              :         "github.com/cockroachdb/pebble/internal/base"
      17              :         "github.com/cockroachdb/pebble/internal/deletepacer"
      18              :         "github.com/cockroachdb/pebble/internal/invariants"
      19              :         "github.com/cockroachdb/pebble/internal/manifest"
      20              :         "github.com/cockroachdb/pebble/objstorage"
      21              :         "github.com/cockroachdb/pebble/record"
      22              :         "github.com/cockroachdb/pebble/vfs"
      23              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      24              : )
      25              : 
      26              : const numLevels = manifest.NumLevels
      27              : 
      28              : const manifestMarkerName = `manifest`
      29              : 
      30              : // versionSet manages a collection of immutable versions, and manages the
      31              : // creation of a new version from the most recent version. A new version is
      32              : // created from an existing version by applying a version edit which is just
      33              : // like it sounds: a delta from the previous version. Version edits are logged
      34              : // to the MANIFEST file, which is replayed at startup.
      35              : type versionSet struct {
      36              :         // Next seqNum to use for WAL writes.
      37              :         //
      38              :         // Note that unflushed WALs may contain sequence numbers less than, equal
      39              :         // to, or greater than logSeqNum. This is because every version edit
      40              :         // includes the current logSeqNum, so any version edits applied after a
      41              :         // write and before the flush of the write to a sstable will update
      42              :         // logSeqNum beyond the write's sequence number.
      43              :         logSeqNum base.AtomicSeqNum
      44              : 
      45              :         // The upper bound on sequence numbers that have been assigned so far. A
      46              :         // suffix of these sequence numbers may not have been written to a WAL. Both
      47              :         // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline.
      48              :         // visibleSeqNum is <= logSeqNum.
      49              :         visibleSeqNum base.AtomicSeqNum
      50              : 
      51              :         // Number of bytes present in sstables being written by in-progress
      52              :         // compactions. This value will be zero if there are no in-progress
      53              :         // compactions. Updated and read atomically.
      54              :         atomicInProgressBytes atomic.Int64
      55              : 
      56              :         // Immutable fields.
      57              :         dirname  string
      58              :         provider objstorage.Provider
      59              :         // Set to DB.mu.
      60              :         mu   *sync.Mutex
      61              :         opts *Options
      62              :         fs   vfs.FS
      63              :         cmp  *base.Comparer
      64              :         // Dynamic base level allows the dynamic base level computation to be
      65              :         // disabled. Used by tests which want to create specific LSM structures.
      66              :         dynamicBaseLevel bool
      67              : 
      68              :         // Mutable fields.
      69              :         versions manifest.VersionList
      70              :         latest   *latestVersionState
      71              :         picker   compactionPicker
      72              :         // curCompactionConcurrency is updated whenever picker is updated.
      73              :         // INVARIANT: >= 1.
      74              :         curCompactionConcurrency atomic.Int32
      75              : 
      76              :         // metrics contains the subset of Metrics that are maintained by the version
      77              :         // set. The rest of the metrics are computed in DB.Metrics().
      78              :         metrics struct {
      79              :                 Levels  AllLevelMetrics
      80              :                 Compact CompactMetrics
      81              :                 Ingest  IngestMetrics
      82              :                 Flush   FlushMetrics
      83              :                 Keys    KeysMetrics
      84              :         }
      85              : 
      86              :         // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure
      87              :         // on the creation of every version.
      88              :         obsoleteFn func(manifest.ObsoleteFiles)
      89              :         // obsolete{Tables,Blobs,Manifests,Options} are sorted by file number ascending.
      90              :         // TODO(jackson, radu): Investigate if we still need this intermediary step or
      91              :         // we can directly enqueue all deletions.
      92              :         obsoleteTables    []deletepacer.ObsoleteFile
      93              :         obsoleteBlobs     []deletepacer.ObsoleteFile
      94              :         obsoleteManifests []deletepacer.ObsoleteFile
      95              :         obsoleteOptions   []deletepacer.ObsoleteFile
      96              : 
      97              :         // Zombie tables which have been removed from the current version but are
      98              :         // still referenced by an inuse iterator.
      99              :         zombieTables zombieObjects
     100              :         // Zombie blobs which have been removed from the current version but are
     101              :         // still referenced by an inuse iterator.
     102              :         zombieBlobs zombieObjects
     103              : 
     104              :         // minUnflushedLogNum is the smallest WAL log file number corresponding to
     105              :         // mutations that have not been flushed to an sstable.
     106              :         minUnflushedLogNum base.DiskFileNum
     107              : 
     108              :         // The next file number. A single counter is used to assign file
     109              :         // numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
     110              :         nextFileNum atomic.Uint64
     111              : 
     112              :         // The current manifest file number.
     113              :         manifestFileNum base.DiskFileNum
     114              :         manifestMarker  *atomicfs.Marker
     115              : 
     116              :         manifestFile          vfs.File
     117              :         manifest              *record.Writer
     118              :         getFormatMajorVersion func() FormatMajorVersion
     119              : 
     120              :         writing    bool
     121              :         writerCond sync.Cond
     122              :         // State for deciding when to write a snapshot. Protected by mu.
     123              :         rotationHelper record.RotationHelper
     124              : 
     125              :         pickedCompactionCache pickedCompactionCache
     126              : }
     127              : 
     128              : // latestVersionState maintains mutable state describing only the most recent
     129              : // version. Unlike a *Version, the state within this struct is updated as new
     130              : // version edits are applied.
     131              : type latestVersionState struct {
     132              :         l0Organizer *manifest.L0Organizer
     133              :         // blobFiles is the set of blob files referenced by the current version.
     134              :         // blobFiles is protected by the manifest logLock (not vs.mu).
     135              :         blobFiles manifest.CurrentBlobFileSet
     136              :         // virtualBackings contains information about the FileBackings which support
     137              :         // virtual sstables in the latest version. It is mainly used to determine when
     138              :         // a backing is no longer in use by the tables in the latest version; this is
     139              :         // not a trivial problem because compactions and other operations can happen
     140              :         // in parallel (and they can finish in unpredictable order).
     141              :         //
     142              :         // This mechanism is complementary to the backing Ref/Unref mechanism, which
     143              :         // determines when a backing is no longer used by *any* live version and can
     144              :         // be removed.
     145              :         //
     146              :         // In principle this should have been a copy-on-write structure, with each
     147              :         // Version having its own record of its virtual backings (similar to the
     148              :         // B-tree that holds the tables). However, in practice we only need it for the
     149              :         // latest version, so we use a simpler structure and store it in the
     150              :         // versionSet instead.
     151              :         //
     152              :         // virtualBackings is modified under DB.mu and the log lock. If it is accessed
     153              :         // under DB.mu and a version update is in progress, it reflects the state of
     154              :         // the next version.
     155              :         virtualBackings manifest.VirtualBackings
     156              : }
     157              : 
     158              : func (vs *versionSet) init(
     159              :         dirname string,
     160              :         provider objstorage.Provider,
     161              :         opts *Options,
     162              :         marker *atomicfs.Marker,
     163              :         getFMV func() FormatMajorVersion,
     164              :         mu *sync.Mutex,
     165            1 : ) {
     166            1 :         vs.dirname = dirname
     167            1 :         vs.provider = provider
     168            1 :         vs.mu = mu
     169            1 :         vs.writerCond.L = mu
     170            1 :         vs.opts = opts
     171            1 :         vs.fs = opts.FS
     172            1 :         vs.cmp = opts.Comparer
     173            1 :         vs.dynamicBaseLevel = true
     174            1 :         vs.versions.Init(mu)
     175            1 :         vs.obsoleteFn = vs.addObsoleteLocked
     176            1 :         vs.zombieTables = makeZombieObjects()
     177            1 :         vs.zombieBlobs = makeZombieObjects()
     178            1 :         vs.nextFileNum.Store(1)
     179            1 :         vs.logSeqNum.Store(base.SeqNumStart)
     180            1 :         vs.manifestMarker = marker
     181            1 :         vs.getFormatMajorVersion = getFMV
     182            1 : }
     183              : 
     184              : // initNewDB creates a version set for a fresh database, persisting an initial
     185              : // manifest file.
     186              : func (vs *versionSet) initNewDB(
     187              :         jobID JobID,
     188              :         dirname string,
     189              :         provider objstorage.Provider,
     190              :         opts *Options,
     191              :         marker *atomicfs.Marker,
     192              :         getFormatMajorVersion func() FormatMajorVersion,
     193              :         blobRewriteHeuristic manifest.BlobRewriteHeuristic,
     194              :         mu *sync.Mutex,
     195            1 : ) error {
     196            1 :         vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
     197            1 :         vs.latest = &latestVersionState{
     198            1 :                 l0Organizer:     manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes),
     199            1 :                 virtualBackings: manifest.MakeVirtualBackings(),
     200            1 :         }
     201            1 :         vs.latest.blobFiles.Init(nil, blobRewriteHeuristic)
     202            1 :         emptyVersion := manifest.NewInitialVersion(opts.Comparer)
     203            1 :         vs.append(emptyVersion)
     204            1 : 
     205            1 :         vs.setCompactionPicker(
     206            1 :                 newCompactionPickerByScore(emptyVersion, vs.latest, vs.opts, nil))
     207            1 :         // Note that a "snapshot" version edit is written to the manifest when it is
     208            1 :         // created.
     209            1 :         vs.manifestFileNum = vs.getNextDiskFileNum()
     210            1 :         err := vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
     211            1 :         if err == nil {
     212            1 :                 if err = vs.manifest.Flush(); err != nil {
     213            0 :                         vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
     214            0 :                 }
     215              :         }
     216            1 :         if err == nil {
     217            1 :                 if err = vs.manifestFile.Sync(); err != nil {
     218            0 :                         vs.opts.Logger.Fatalf("MANIFEST sync failed: %v", err)
     219            0 :                 }
     220              :         }
     221            1 :         if err == nil {
     222            1 :                 // NB: Move() is responsible for syncing the data directory.
     223            1 :                 if err = vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, vs.manifestFileNum)); err != nil {
     224            0 :                         vs.opts.Logger.Fatalf("MANIFEST set current failed: %v", err)
     225            0 :                 }
     226              :         }
     227              : 
     228            1 :         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     229            1 :                 JobID:   int(jobID),
     230            1 :                 Path:    base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
     231            1 :                 FileNum: vs.manifestFileNum,
     232            1 :                 Err:     err,
     233            1 :         })
     234            1 :         if err != nil {
     235            0 :                 return err
     236            0 :         }
     237            1 :         return nil
     238              : }
     239              : 
     240              : // initRecoveredDB initializes the version set from the existing database state
     241              : // recovered from an existing manifest file.
     242              : func (vs *versionSet) initRecoveredDB(
     243              :         dirname string,
     244              :         provider objstorage.Provider,
     245              :         opts *Options,
     246              :         rv *recoveredVersion,
     247              :         marker *atomicfs.Marker,
     248              :         getFormatMajorVersion func() FormatMajorVersion,
     249              :         mu *sync.Mutex,
     250            1 : ) error {
     251            1 :         vs.init(dirname, provider, opts, marker, getFormatMajorVersion, mu)
     252            1 :         vs.manifestFileNum = rv.manifestFileNum
     253            1 :         vs.minUnflushedLogNum = rv.minUnflushedLogNum
     254            1 :         vs.nextFileNum.Store(uint64(rv.nextFileNum))
     255            1 :         vs.logSeqNum.Store(rv.logSeqNum)
     256            1 :         vs.latest = rv.latest
     257            1 :         setBasicLevelMetrics(&vs.metrics.Levels, rv.version)
     258            1 :         vs.append(rv.version)
     259            1 :         vs.setCompactionPicker(newCompactionPickerByScore(
     260            1 :                 rv.version, vs.latest, vs.opts, nil))
     261            1 :         return nil
     262            1 : }
     263              : 
     264            1 : func (vs *versionSet) close() error {
     265            1 :         if vs.manifestFile != nil {
     266            1 :                 if err := vs.manifestFile.Close(); err != nil {
     267            0 :                         return err
     268            0 :                 }
     269              :         }
     270            1 :         if vs.manifestMarker != nil {
     271            1 :                 if err := vs.manifestMarker.Close(); err != nil {
     272            0 :                         return err
     273            0 :                 }
     274              :         }
     275            1 :         return nil
     276              : }
     277              : 
     278              : // logLock locks the manifest for writing. The lock must be released by
     279              : // a call to logUnlock.
     280              : //
     281              : // DB.mu must be held when calling this method, but the mutex may be dropped and
     282              : // re-acquired during the course of this method.
     283            1 : func (vs *versionSet) logLock() {
     284            1 :         // Wait for any existing writing to the manifest to complete, then mark the
     285            1 :         // manifest as busy.
     286            1 :         for vs.writing {
     287            1 :                 // Note: writerCond.L is DB.mu, so we unlock it while we wait.
     288            1 :                 vs.writerCond.Wait()
     289            1 :         }
     290            1 :         vs.writing = true
     291              : }
     292              : 
     293              : // logUnlock releases the lock for manifest writing.
     294              : //
     295              : // DB.mu must be held when calling this method.
     296            1 : func (vs *versionSet) logUnlock() {
     297            1 :         if !vs.writing {
     298            0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     299            0 :         }
     300            1 :         vs.writing = false
     301            1 :         vs.writerCond.Signal()
     302              : }
     303              : 
     304            1 : func (vs *versionSet) logUnlockAndInvalidatePickedCompactionCache() {
     305            1 :         vs.pickedCompactionCache.invalidate()
     306            1 :         vs.logUnlock()
     307            1 : }
     308              : 
     309              : // versionUpdate is returned by the function passed to UpdateVersionLocked.
     310              : //
     311              : // If VE is nil, there is no update to apply (but it is not an error).
     312              : type versionUpdate struct {
     313              :         VE      *manifest.VersionEdit
     314              :         JobID   JobID
     315              :         Metrics levelMetricsDelta
     316              :         // InProgressCompactionFn is called while DB.mu is held after the I/O part of
     317              :         // the update was performed. It should return any compactions that are
     318              :         // in-progress (excluding than the one that is being applied).
     319              :         InProgressCompactionsFn func() []compactionInfo
     320              :         ForceManifestRotation   bool
     321              : }
     322              : 
     323              : // UpdateVersionLocked is used to update the current version, returning the
     324              : // duration of the manifest update. If the manifest update is unsuccessful, the
     325              : // duration will be unset (0).
     326              : //
     327              : // DB.mu must be held. UpdateVersionLocked first waits for any other version
     328              : // update to complete, releasing and reacquiring DB.mu.
     329              : //
     330              : // UpdateVersionLocked then calls updateFn which builds a versionUpdate, while
     331              : // holding DB.mu. The updateFn can release and reacquire DB.mu (it should
     332              : // attempt to do as much work as possible outside of the lock).
     333              : //
     334              : // UpdateVersionLocked fills in the following fields of the VersionEdit:
     335              : // NextFileNum, LastSeqNum, RemovedBackingTables. The removed backing tables are
     336              : // those backings that are no longer used (in the new version) after applying
     337              : // the edit (as per vs.virtualBackings). Other than these fields, the
     338              : // VersionEdit must be complete.
     339              : //
     340              : // New table backing references (TableBacking.Ref) are taken as part of applying
     341              : // the version edit. The state of the virtual backings (vs.virtualBackings) is
     342              : // updated before logging to the manifest and installing the latest version;
     343              : // this is ok because any failure in those steps is fatal.
     344              : //
     345              : // If updateFn returns an error, no update is applied and that same error is returned.
     346              : // If versionUpdate.VE is nil, the no update is applied (and no error is returned).
     347              : func (vs *versionSet) UpdateVersionLocked(
     348              :         updateFn func() (versionUpdate, error),
     349            1 : ) (time.Duration, error) {
     350            1 :         vs.logLock()
     351            1 :         defer vs.logUnlockAndInvalidatePickedCompactionCache()
     352            1 : 
     353            1 :         vu, err := updateFn()
     354            1 :         if err != nil || vu.VE == nil {
     355            1 :                 return 0, err
     356            1 :         }
     357              : 
     358            1 :         if !vs.writing {
     359            0 :                 vs.opts.Logger.Fatalf("MANIFEST not locked for writing")
     360            0 :         }
     361              : 
     362            1 :         updateManifestStart := crtime.NowMono()
     363            1 :         ve := vu.VE
     364            1 :         if ve.MinUnflushedLogNum != 0 {
     365            1 :                 if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
     366            1 :                         vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
     367            0 :                         panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
     368            0 :                                 ve.MinUnflushedLogNum))
     369              :                 }
     370              :         }
     371              : 
     372              :         // This is the next manifest filenum, but if the current file is too big we
     373              :         // will write this ve to the next file which means what ve encodes is the
     374              :         // current filenum and not the next one.
     375              :         //
     376              :         // TODO(sbhola): figure out why this is correct and update comment.
     377            1 :         ve.NextFileNum = vs.nextFileNum.Load()
     378            1 : 
     379            1 :         // LastSeqNum is set to the current upper bound on the assigned sequence
     380            1 :         // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
     381            1 :         // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
     382            1 :         // replay. It must be higher than or equal to any than any sequence number
     383            1 :         // written to an sstable, including sequence numbers in ingested files.
     384            1 :         // Note that LastSeqNum is not (and cannot be) the minimum unflushed sequence
     385            1 :         // number. This is fallout from ingestion which allows a sequence number X to
     386            1 :         // be assigned to an ingested sstable even though sequence number X-1 resides
     387            1 :         // in an unflushed memtable. logSeqNum is the _next_ sequence number that
     388            1 :         // will be assigned, so subtract that by 1 to get the upper bound on the
     389            1 :         // last assigned sequence number.
     390            1 :         logSeqNum := vs.logSeqNum.Load()
     391            1 :         ve.LastSeqNum = logSeqNum - 1
     392            1 :         if logSeqNum == 0 {
     393            0 :                 // logSeqNum is initialized to 1 in Open() if there are no previous WAL
     394            0 :                 // or manifest records, so this case should never happen.
     395            0 :                 vs.opts.Logger.Fatalf("logSeqNum must be a positive integer: %d", logSeqNum)
     396            0 :         }
     397              : 
     398            1 :         currentVersion := vs.currentVersion()
     399            1 :         var newVersion *manifest.Version
     400            1 : 
     401            1 :         // Generate a new manifest if we don't currently have one, or forceRotation
     402            1 :         // is true, or the current one is too large.
     403            1 :         //
     404            1 :         // For largeness, we do not exclusively use MaxManifestFileSize size
     405            1 :         // threshold since we have had incidents where due to either large keys or
     406            1 :         // large numbers of files, each edit results in a snapshot + write of the
     407            1 :         // edit. This slows the system down since each flush or compaction is
     408            1 :         // writing a new manifest snapshot. The primary goal of the size-based
     409            1 :         // rollover logic is to ensure that when reopening a DB, the number of edits
     410            1 :         // that need to be replayed on top of the snapshot is "sane". Rolling over
     411            1 :         // to a new manifest after each edit is not relevant to that goal.
     412            1 :         //
     413            1 :         // Consider the following cases:
     414            1 :         // - The number of live files F in the DB is roughly stable: after writing
     415            1 :         //   the snapshot (with F files), say we require that there be enough edits
     416            1 :         //   such that the cumulative number of files in those edits, E, be greater
     417            1 :         //   than F. This will ensure that the total amount of time in
     418            1 :         //   UpdateVersionLocked that is spent in snapshot writing is ~50%.
     419            1 :         //
     420            1 :         // - The number of live files F in the DB is shrinking drastically, say from
     421            1 :         //   F to F/10: This can happen for various reasons, like wide range
     422            1 :         //   tombstones, or large numbers of smaller than usual files that are being
     423            1 :         //   merged together into larger files. And say the new files generated
     424            1 :         //   during this shrinkage is insignificant compared to F/10, and so for
     425            1 :         //   this example we will assume it is effectively 0. After this shrinking,
     426            1 :         //   E = 0.9F, and so if we used the previous snapshot file count, F, as the
     427            1 :         //   threshold that needs to be exceeded, we will further delay the snapshot
     428            1 :         //   writing. Which means on DB reopen we will need to replay 0.9F edits to
     429            1 :         //   get to a version with 0.1F files. It would be better to create a new
     430            1 :         //   snapshot when E exceeds the number of files in the current version.
     431            1 :         //
     432            1 :         // - The number of live files F in the DB is growing via perfect ingests
     433            1 :         //   into L6: Say we wrote the snapshot when there were F files and now we
     434            1 :         //   have 10F files, so E = 9F. We will further delay writing a new
     435            1 :         //   snapshot. This case can be critiqued as contrived, but we consider it
     436            1 :         //   nonetheless.
     437            1 :         //
     438            1 :         // The logic below uses the min of the last snapshot file count and the file
     439            1 :         // count in the current version.
     440            1 :         vs.rotationHelper.AddRecord(int64(len(ve.DeletedTables) + len(ve.NewTables)))
     441            1 :         sizeExceeded := vs.manifest.Size() >= vs.opts.MaxManifestFileSize
     442            1 :         requireRotation := vu.ForceManifestRotation || vs.manifest == nil
     443            1 : 
     444            1 :         var nextSnapshotFilecount int64
     445            1 :         for i := range vs.metrics.Levels {
     446            1 :                 nextSnapshotFilecount += int64(vs.metrics.Levels[i].Tables.Count)
     447            1 :         }
     448            1 :         if sizeExceeded && !requireRotation {
     449            1 :                 requireRotation = vs.rotationHelper.ShouldRotate(nextSnapshotFilecount)
     450            1 :         }
     451            1 :         var newManifestFileNum base.DiskFileNum
     452            1 :         var prevManifestFileSize uint64
     453            1 :         var newManifestVirtualBackings []*manifest.TableBacking
     454            1 :         if requireRotation {
     455            1 :                 newManifestFileNum = vs.getNextDiskFileNum()
     456            1 :                 prevManifestFileSize = uint64(vs.manifest.Size())
     457            1 : 
     458            1 :                 // We want the virtual backings *before* applying the version edit, because
     459            1 :                 // the new manifest will contain the pre-apply version plus the last version
     460            1 :                 // edit.
     461            1 :                 newManifestVirtualBackings = slices.Collect(vs.latest.virtualBackings.All())
     462            1 :         }
     463              : 
     464              :         // Grab certain values before releasing vs.mu, in case createManifest() needs
     465              :         // to be called.
     466            1 :         minUnflushedLogNum := vs.minUnflushedLogNum
     467            1 :         nextFileNum := vs.nextFileNum.Load()
     468            1 : 
     469            1 :         // Note: this call populates ve.RemovedBackingTables.
     470            1 :         zombieBackings, removedVirtualBackings :=
     471            1 :                 getZombieTablesAndUpdateVirtualBackings(ve, &vs.latest.virtualBackings, vs.provider)
     472            1 : 
     473            1 :         var l0Update manifest.L0PreparedUpdate
     474            1 :         if err := func() error {
     475            1 :                 vs.mu.Unlock()
     476            1 :                 defer vs.mu.Lock()
     477            1 : 
     478            1 :                 if vs.getFormatMajorVersion() < FormatVirtualSSTables && len(ve.CreatedBackingTables) > 0 {
     479            0 :                         return base.AssertionFailedf("MANIFEST cannot contain virtual sstable records due to format major version")
     480            0 :                 }
     481              : 
     482              :                 // Rotate the manifest if necessary. Rotating the manifest involves
     483              :                 // creating a new file and writing an initial version edit containing a
     484              :                 // snapshot of the current version. This initial version edit will
     485              :                 // reflect the Version prior to the pending version edit (`ve`). Once
     486              :                 // we've created the new manifest with the previous version state, we'll
     487              :                 // append the version edit `ve` to the tail of the new manifest.
     488            1 :                 if newManifestFileNum != 0 {
     489            1 :                         if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil {
     490            0 :                                 vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     491            0 :                                         JobID:   int(vu.JobID),
     492            0 :                                         Path:    base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
     493            0 :                                         FileNum: newManifestFileNum,
     494            0 :                                         Err:     err,
     495            0 :                                 })
     496            0 :                                 return errors.Wrap(err, "MANIFEST create failed")
     497            0 :                         }
     498              :                 }
     499              : 
     500              :                 // Call ApplyAndUpdateVersionEdit before accumulating the version edit.
     501              :                 // If any blob files are no longer referenced, the version edit will be
     502              :                 // updated to explicitly record the deletion of the blob files. This can
     503              :                 // happen here because vs.blobFiles is protected by the manifest logLock
     504              :                 // (NOT vs.mu). We only read or write vs.blobFiles while holding the
     505              :                 // manifest lock.
     506            1 :                 if err := vs.latest.blobFiles.ApplyAndUpdateVersionEdit(ve); err != nil {
     507            0 :                         return errors.Wrap(err, "MANIFEST blob files apply and update failed")
     508            0 :                 }
     509              : 
     510            1 :                 var bulkEdit manifest.BulkVersionEdit
     511            1 :                 err := bulkEdit.Accumulate(ve)
     512            1 :                 if err != nil {
     513            0 :                         return errors.Wrap(err, "MANIFEST accumulate failed")
     514            0 :                 }
     515            1 :                 newVersion, err = bulkEdit.Apply(currentVersion, vs.opts.Experimental.ReadCompactionRate)
     516            1 :                 if err != nil {
     517            0 :                         return errors.Wrap(err, "MANIFEST apply failed")
     518            0 :                 }
     519            1 :                 l0Update = vs.latest.l0Organizer.PrepareUpdate(&bulkEdit, newVersion)
     520            1 : 
     521            1 :                 w, err := vs.manifest.Next()
     522            1 :                 if err != nil {
     523            0 :                         return errors.Wrap(err, "MANIFEST next record write failed")
     524            0 :                 }
     525              : 
     526              :                 // NB: Any error from this point on is considered fatal as we don't know if
     527              :                 // the MANIFEST write occurred or not. Trying to determine that is
     528              :                 // fraught. Instead we rely on the standard recovery mechanism run when a
     529              :                 // database is open. In particular, that mechanism generates a new MANIFEST
     530              :                 // and ensures it is synced.
     531            1 :                 if err := ve.Encode(w); err != nil {
     532            0 :                         return errors.Wrap(err, "MANIFEST write failed")
     533            0 :                 }
     534            1 :                 if err := vs.manifest.Flush(); err != nil {
     535            0 :                         return errors.Wrap(err, "MANIFEST flush failed")
     536            0 :                 }
     537            1 :                 if err := vs.manifestFile.Sync(); err != nil {
     538            0 :                         return errors.Wrap(err, "MANIFEST sync failed")
     539            0 :                 }
     540            1 :                 if newManifestFileNum != 0 {
     541            1 :                         // NB: Move() is responsible for syncing the data directory.
     542            1 :                         if err := vs.manifestMarker.Move(base.MakeFilename(base.FileTypeManifest, newManifestFileNum)); err != nil {
     543            0 :                                 return errors.Wrap(err, "MANIFEST set current failed")
     544            0 :                         }
     545            1 :                         vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{
     546            1 :                                 JobID:   int(vu.JobID),
     547            1 :                                 Path:    base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, newManifestFileNum),
     548            1 :                                 FileNum: newManifestFileNum,
     549            1 :                         })
     550              :                 }
     551            1 :                 return nil
     552            0 :         }(); err != nil {
     553            0 :                 // Any error encountered during any of the operations in the previous
     554            0 :                 // closure are considered fatal. Treating such errors as fatal is preferred
     555            0 :                 // to attempting to unwind various file and b-tree reference counts, and
     556            0 :                 // re-generating L0 sublevel metadata. This may change in the future, if
     557            0 :                 // certain manifest / WAL operations become retryable. For more context, see
     558            0 :                 // #1159 and #1792.
     559            0 :                 vs.opts.Logger.Fatalf("%s", err)
     560            0 :                 return 0, err
     561            0 :         }
     562              : 
     563            1 :         if requireRotation {
     564            1 :                 // Successfully rotated.
     565            1 :                 vs.rotationHelper.Rotate(nextSnapshotFilecount)
     566            1 :         }
     567              :         // Now that DB.mu is held again, initialize compacting file info in
     568              :         // L0Sublevels.
     569            1 :         inProgress := vu.InProgressCompactionsFn()
     570            1 : 
     571            1 :         zombieBlobs := getZombieBlobFiles(ve, vs.provider)
     572            1 :         vs.latest.l0Organizer.PerformUpdate(l0Update, newVersion)
     573            1 :         vs.latest.l0Organizer.InitCompactingFileInfo(inProgressL0Compactions(inProgress))
     574            1 : 
     575            1 :         // Update the zombie objects sets first, as installation of the new version
     576            1 :         // will unref the previous version which could result in addObsoleteLocked
     577            1 :         // being called.
     578            1 :         for _, b := range zombieBackings {
     579            1 :                 vs.zombieTables.Add(objectInfo{
     580            1 :                         fileInfo: fileInfo{
     581            1 :                                 FileNum:  b.backing.DiskFileNum,
     582            1 :                                 FileSize: b.backing.Size,
     583            1 :                         },
     584            1 :                         placement: b.placement,
     585            1 :                 })
     586            1 :         }
     587            1 :         for _, zb := range zombieBlobs {
     588            1 :                 vs.zombieBlobs.Add(zb)
     589            1 :         }
     590              :         // Unref the removed backings and report those that already became obsolete.
     591              :         // Note that the only case where we report obsolete tables here is when
     592              :         // VirtualBackings.Protect/Unprotect was used to keep a backing alive without
     593              :         // it being used in the current version.
     594            1 :         var obsoleteVirtualBackings manifest.ObsoleteFiles
     595            1 :         for _, b := range removedVirtualBackings {
     596            1 :                 if b.backing.Unref() == 0 {
     597            1 :                         obsoleteVirtualBackings.TableBackings = append(obsoleteVirtualBackings.TableBackings, b.backing)
     598            1 :                 }
     599              :         }
     600            1 :         vs.addObsoleteLocked(obsoleteVirtualBackings)
     601            1 : 
     602            1 :         // Install the new version.
     603            1 :         vs.append(newVersion)
     604            1 : 
     605            1 :         if ve.MinUnflushedLogNum != 0 {
     606            1 :                 vs.minUnflushedLogNum = ve.MinUnflushedLogNum
     607            1 :         }
     608            1 :         if newManifestFileNum != 0 {
     609            1 :                 if vs.manifestFileNum != 0 {
     610            1 :                         vs.obsoleteManifests = append(vs.obsoleteManifests, deletepacer.ObsoleteFile{
     611            1 :                                 FileType:  base.FileTypeManifest,
     612            1 :                                 FS:        vs.fs,
     613            1 :                                 Path:      base.MakeFilepath(vs.fs, vs.dirname, base.FileTypeManifest, vs.manifestFileNum),
     614            1 :                                 FileNum:   vs.manifestFileNum,
     615            1 :                                 FileSize:  prevManifestFileSize,
     616            1 :                                 Placement: base.Local,
     617            1 :                         })
     618            1 :                 }
     619            1 :                 vs.manifestFileNum = newManifestFileNum
     620              :         }
     621              : 
     622            1 :         vs.metrics.Levels.update(vu.Metrics)
     623            1 :         setBasicLevelMetrics(&vs.metrics.Levels, newVersion)
     624            1 : 
     625            1 :         vs.setCompactionPicker(newCompactionPickerByScore(newVersion, vs.latest, vs.opts, inProgress))
     626            1 :         if !vs.dynamicBaseLevel {
     627            0 :                 vs.picker.forceBaseLevel1()
     628            0 :         }
     629              : 
     630            1 :         return updateManifestStart.Elapsed(), nil
     631              : }
     632              : 
     633            1 : func (vs *versionSet) setCompactionPicker(picker *compactionPickerByScore) {
     634            1 :         vs.picker = picker
     635            1 :         vs.curCompactionConcurrency.Store(int32(picker.getCompactionConcurrency()))
     636            1 : }
     637              : 
     638              : type tableBackingInfo struct {
     639              :         backing   *manifest.TableBacking
     640              :         placement base.Placement
     641              : }
     642              : 
     643              : // getZombieTablesAndUpdateVirtualBackings updates the virtual backings with the
     644              : // changes in the versionEdit and populates ve.RemovedBackingTables.
     645              : // Returns:
     646              : //   - zombieBackings: all backings (physical and virtual) that will no longer be
     647              : //     needed when we apply ve.
     648              : //   - removedVirtualBackings: the virtual backings that will be removed by the
     649              : //     VersionEdit and which must be Unref()ed by the caller. These backings
     650              : //     match ve.RemovedBackingTables.
     651              : func getZombieTablesAndUpdateVirtualBackings(
     652              :         ve *manifest.VersionEdit, virtualBackings *manifest.VirtualBackings, provider objstorage.Provider,
     653            1 : ) (zombieBackings, removedVirtualBackings []tableBackingInfo) {
     654            1 :         // First, deal with the physical tables.
     655            1 :         //
     656            1 :         // A physical backing has become unused if it is in DeletedFiles but not in
     657            1 :         // NewFiles or CreatedBackingTables.
     658            1 :         //
     659            1 :         // Note that for the common case where there are very few elements, the map
     660            1 :         // will stay on the stack.
     661            1 :         stillUsed := make(map[base.DiskFileNum]struct{})
     662            1 :         for _, nf := range ve.NewTables {
     663            1 :                 if !nf.Meta.Virtual {
     664            1 :                         stillUsed[nf.Meta.TableBacking.DiskFileNum] = struct{}{}
     665            1 :                 }
     666              :         }
     667            1 :         for _, b := range ve.CreatedBackingTables {
     668            1 :                 stillUsed[b.DiskFileNum] = struct{}{}
     669            1 :         }
     670            1 :         for _, m := range ve.DeletedTables {
     671            1 :                 if !m.Virtual {
     672            1 :                         if _, ok := stillUsed[m.TableBacking.DiskFileNum]; !ok {
     673            1 :                                 zombieBackings = append(zombieBackings, tableBackingInfo{
     674            1 :                                         backing:   m.TableBacking,
     675            1 :                                         placement: objstorage.Placement(provider, base.FileTypeTable, m.TableBacking.DiskFileNum),
     676            1 :                                 })
     677            1 :                         }
     678              :                 }
     679              :         }
     680              : 
     681              :         // Now deal with virtual tables.
     682              :         //
     683              :         // When a virtual table moves between levels we RemoveTable() then AddTable(),
     684              :         // which works out.
     685            1 :         for _, b := range ve.CreatedBackingTables {
     686            1 :                 placement := objstorage.Placement(provider, base.FileTypeTable, b.DiskFileNum)
     687            1 :                 virtualBackings.AddAndRef(b, placement)
     688            1 :         }
     689            1 :         for _, m := range ve.DeletedTables {
     690            1 :                 if m.Virtual {
     691            1 :                         virtualBackings.RemoveTable(m.TableBacking.DiskFileNum, m.TableNum)
     692            1 :                 }
     693              :         }
     694            1 :         for _, nf := range ve.NewTables {
     695            1 :                 if nf.Meta.Virtual {
     696            1 :                         virtualBackings.AddTable(nf.Meta, nf.Level)
     697            1 :                 }
     698              :         }
     699              : 
     700            1 :         if unused := virtualBackings.Unused(); len(unused) > 0 {
     701            1 :                 // Virtual backings that are no longer used are zombies and are also added
     702            1 :                 // to RemovedBackingTables (before the version edit is written to disk).
     703            1 :                 ve.RemovedBackingTables = make([]base.DiskFileNum, len(unused))
     704            1 :                 for i, b := range unused {
     705            1 :                         placement := objstorage.Placement(provider, base.FileTypeTable, b.DiskFileNum)
     706            1 :                         ve.RemovedBackingTables[i] = b.DiskFileNum
     707            1 :                         zombieBackings = append(zombieBackings, tableBackingInfo{
     708            1 :                                 backing:   b,
     709            1 :                                 placement: placement,
     710            1 :                         })
     711            1 :                         virtualBackings.Remove(b.DiskFileNum)
     712            1 :                 }
     713            1 :                 removedVirtualBackings = zombieBackings[len(zombieBackings)-len(unused):]
     714              :         }
     715            1 :         return zombieBackings, removedVirtualBackings
     716              : }
     717              : 
     718              : // getZombieBlobFiles constructs objectInfos for all zombie blob files.
     719              : func getZombieBlobFiles(
     720              :         ve *manifest.VersionEdit, provider objstorage.Provider,
     721            1 : ) (zombieBlobFiles []objectInfo) {
     722            1 :         zombieBlobFiles = make([]objectInfo, 0, len(ve.DeletedBlobFiles))
     723            1 :         for _, physical := range ve.DeletedBlobFiles {
     724            1 :                 placement := objstorage.Placement(provider, base.FileTypeBlob, physical.FileNum)
     725            1 :                 zombieBlobFiles = append(zombieBlobFiles, objectInfo{
     726            1 :                         fileInfo: fileInfo{
     727            1 :                                 FileNum:  physical.FileNum,
     728            1 :                                 FileSize: physical.Size,
     729            1 :                         },
     730            1 :                         placement: placement,
     731            1 :                 })
     732            1 :         }
     733            1 :         return zombieBlobFiles
     734              : }
     735              : 
     736              : func (vs *versionSet) incrementCompactions(
     737              :         kind compactionKind, extraLevels []*compactionLevel, bytesWritten int64, compactionErr error,
     738            1 : ) {
     739            1 :         if kind == compactionKindFlush || kind == compactionKindIngestedFlushable {
     740            1 :                 vs.metrics.Flush.Count++
     741            1 :         } else {
     742            1 :                 vs.metrics.Compact.Count++
     743            1 :                 if compactionErr != nil {
     744            1 :                         if errors.Is(compactionErr, ErrCancelledCompaction) {
     745            1 :                                 vs.metrics.Compact.CancelledCount++
     746            1 :                                 vs.metrics.Compact.CancelledBytes += bytesWritten
     747            1 :                         } else {
     748            0 :                                 vs.metrics.Compact.FailedCount++
     749            0 :                         }
     750              :                 }
     751              :         }
     752              : 
     753            1 :         switch kind {
     754            1 :         case compactionKindDefault:
     755            1 :                 vs.metrics.Compact.DefaultCount++
     756              : 
     757            1 :         case compactionKindFlush, compactionKindIngestedFlushable:
     758              : 
     759            1 :         case compactionKindMove:
     760            1 :                 vs.metrics.Compact.MoveCount++
     761              : 
     762            1 :         case compactionKindDeleteOnly:
     763            1 :                 vs.metrics.Compact.DeleteOnlyCount++
     764              : 
     765            1 :         case compactionKindElisionOnly:
     766            1 :                 vs.metrics.Compact.ElisionOnlyCount++
     767              : 
     768            0 :         case compactionKindRead:
     769            0 :                 vs.metrics.Compact.ReadCount++
     770              : 
     771            1 :         case compactionKindTombstoneDensity:
     772            1 :                 vs.metrics.Compact.TombstoneDensityCount++
     773              : 
     774            1 :         case compactionKindRewrite:
     775            1 :                 vs.metrics.Compact.RewriteCount++
     776              : 
     777            1 :         case compactionKindCopy:
     778            1 :                 vs.metrics.Compact.CopyCount++
     779              : 
     780            1 :         case compactionKindBlobFileRewrite:
     781            1 :                 vs.metrics.Compact.BlobFileRewriteCount++
     782              : 
     783            1 :         case compactionKindVirtualRewrite:
     784            1 :                 vs.metrics.Compact.VirtualRewriteCount++
     785              : 
     786            0 :         default:
     787            0 :                 if invariants.Enabled {
     788            0 :                         panic("unhandled compaction kind")
     789              :                 }
     790              : 
     791              :         }
     792            1 :         if len(extraLevels) > 0 {
     793            1 :                 vs.metrics.Compact.MultiLevelCount++
     794            1 :         }
     795              : }
     796              : 
     797            1 : func (vs *versionSet) incrementCompactionBytes(numBytes int64) {
     798            1 :         vs.atomicInProgressBytes.Add(numBytes)
     799            1 : }
     800              : 
     801              : // createManifest creates a manifest file that contains a snapshot of vs.
     802              : func (vs *versionSet) createManifest(
     803              :         dirname string,
     804              :         fileNum, minUnflushedLogNum base.DiskFileNum,
     805              :         nextFileNum uint64,
     806              :         virtualBackings []*manifest.TableBacking,
     807            1 : ) (err error) {
     808            1 :         var (
     809            1 :                 filename       = base.MakeFilepath(vs.fs, dirname, base.FileTypeManifest, fileNum)
     810            1 :                 manifestFile   vfs.File
     811            1 :                 manifestWriter *record.Writer
     812            1 :         )
     813            1 :         defer func() {
     814            1 :                 if manifestWriter != nil {
     815            0 :                         _ = manifestWriter.Close()
     816            0 :                 }
     817            1 :                 if manifestFile != nil {
     818            0 :                         _ = manifestFile.Close()
     819            0 :                 }
     820            1 :                 if err != nil {
     821            0 :                         _ = vs.fs.Remove(filename)
     822            0 :                 }
     823              :         }()
     824            1 :         manifestFile, err = vs.fs.Create(filename, "pebble-manifest")
     825            1 :         if err != nil {
     826            0 :                 return err
     827            0 :         }
     828            1 :         manifestWriter = record.NewWriter(manifestFile)
     829            1 : 
     830            1 :         snapshot := manifest.VersionEdit{
     831            1 :                 ComparerName: vs.cmp.Name,
     832            1 :                 // When creating a version snapshot for an existing DB, this snapshot
     833            1 :                 // VersionEdit will be immediately followed by another VersionEdit
     834            1 :                 // (being written in UpdateVersionLocked()). That VersionEdit always
     835            1 :                 // contains a LastSeqNum, so we don't need to include that in the
     836            1 :                 // snapshot.  But it does not necessarily include MinUnflushedLogNum,
     837            1 :                 // NextFileNum, so we initialize those using the corresponding fields in
     838            1 :                 // the versionSet (which came from the latest preceding VersionEdit that
     839            1 :                 // had those fields).
     840            1 :                 MinUnflushedLogNum:   minUnflushedLogNum,
     841            1 :                 NextFileNum:          nextFileNum,
     842            1 :                 CreatedBackingTables: virtualBackings,
     843            1 :                 NewBlobFiles:         vs.latest.blobFiles.Metadatas(),
     844            1 :         }
     845            1 : 
     846            1 :         // Add all extant sstables in the current version.
     847            1 :         for level, levelMetadata := range vs.currentVersion().Levels {
     848            1 :                 for meta := range levelMetadata.All() {
     849            1 :                         snapshot.NewTables = append(snapshot.NewTables, manifest.NewTableEntry{
     850            1 :                                 Level: level,
     851            1 :                                 Meta:  meta,
     852            1 :                         })
     853            1 :                 }
     854              :         }
     855              : 
     856              :         // Add all tables marked for compaction.
     857            1 :         if vs.getFormatMajorVersion() >= FormatMarkForCompactionInVersionEdit {
     858            1 :                 for meta, level := range vs.currentVersion().MarkedForCompaction.Ascending() {
     859            0 :                         snapshot.TablesMarkedForCompaction = append(snapshot.TablesMarkedForCompaction, manifest.TableMarkedForCompactionEntry{
     860            0 :                                 Level:    level,
     861            0 :                                 TableNum: meta.TableNum,
     862            0 :                         })
     863            0 :                 }
     864              :         }
     865              : 
     866            1 :         w, err1 := manifestWriter.Next()
     867            1 :         if err1 != nil {
     868            0 :                 return err1
     869            0 :         }
     870            1 :         if err := snapshot.Encode(w); err != nil {
     871            0 :                 return err
     872            0 :         }
     873              : 
     874            1 :         if vs.manifest != nil {
     875            1 :                 if err := vs.manifest.Close(); err != nil {
     876            0 :                         return err
     877            0 :                 }
     878            1 :                 vs.manifest = nil
     879              :         }
     880            1 :         if vs.manifestFile != nil {
     881            1 :                 if err := vs.manifestFile.Close(); err != nil {
     882            0 :                         return err
     883            0 :                 }
     884            1 :                 vs.manifestFile = nil
     885              :         }
     886              : 
     887            1 :         vs.manifest, manifestWriter = manifestWriter, nil
     888            1 :         vs.manifestFile, manifestFile = manifestFile, nil
     889            1 :         return nil
     890              : }
     891              : 
     892              : // NB: This method is not safe for concurrent use. It is only safe
     893              : // to be called when concurrent changes to nextFileNum are not expected.
     894            1 : func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
     895            1 :         if vs.nextFileNum.Load() <= uint64(fileNum) {
     896            1 :                 vs.nextFileNum.Store(uint64(fileNum + 1))
     897            1 :         }
     898              : }
     899              : 
     900              : // getNextTableNum returns a new table number.
     901              : //
     902              : // Can be called without the versionSet's mutex being held.
     903            1 : func (vs *versionSet) getNextTableNum() base.TableNum {
     904            1 :         x := vs.nextFileNum.Add(1) - 1
     905            1 :         return base.TableNum(x)
     906            1 : }
     907              : 
     908              : // Can be called without the versionSet's mutex being held.
     909            1 : func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
     910            1 :         x := vs.nextFileNum.Add(1) - 1
     911            1 :         return base.DiskFileNum(x)
     912            1 : }
     913              : 
     914            1 : func (vs *versionSet) append(v *manifest.Version) {
     915            1 :         if v.Refs() != 0 {
     916            0 :                 panic("pebble: version should be unreferenced")
     917              :         }
     918            1 :         if !vs.versions.Empty() {
     919            1 :                 vs.versions.Back().UnrefLocked()
     920            1 :         }
     921            1 :         v.Deleted = vs.obsoleteFn
     922            1 :         v.Ref()
     923            1 :         vs.versions.PushBack(v)
     924            1 :         if invariants.Enabled {
     925            1 :                 // Verify that the virtualBackings contains all the backings referenced by
     926            1 :                 // the version.
     927            1 :                 for _, l := range v.Levels {
     928            1 :                         for f := range l.All() {
     929            1 :                                 if f.Virtual {
     930            1 :                                         if _, ok := vs.latest.virtualBackings.Get(f.TableBacking.DiskFileNum); !ok {
     931            0 :                                                 panic(fmt.Sprintf("%s is not in virtualBackings", f.TableBacking.DiskFileNum))
     932              :                                         }
     933              :                                 }
     934              :                         }
     935              :                 }
     936              :         }
     937              : }
     938              : 
     939            1 : func (vs *versionSet) currentVersion() *manifest.Version {
     940            1 :         return vs.versions.Back()
     941            1 : }
     942              : 
     943            1 : func (vs *versionSet) addLiveFileNums(m map[base.DiskFileNum]struct{}) {
     944            1 :         current := vs.currentVersion()
     945            1 :         for v := vs.versions.Front(); true; v = v.Next() {
     946            1 :                 for _, lm := range v.Levels {
     947            1 :                         for f := range lm.All() {
     948            1 :                                 m[f.TableBacking.DiskFileNum] = struct{}{}
     949            1 :                         }
     950              :                 }
     951            1 :                 for bf := range v.BlobFiles.All() {
     952            1 :                         m[bf.Physical.FileNum] = struct{}{}
     953            1 :                 }
     954            1 :                 if v == current {
     955            1 :                         break
     956              :                 }
     957              :         }
     958              :         // virtualBackings contains backings that are referenced by some virtual
     959              :         // tables in the latest version (which are handled above), and backings that
     960              :         // are not but are still alive because of the protection mechanism (see
     961              :         // manifset.VirtualBackings). This loop ensures the latter get added to the
     962              :         // map.
     963            1 :         for b := range vs.latest.virtualBackings.All() {
     964            1 :                 m[b.DiskFileNum] = struct{}{}
     965            1 :         }
     966              : }
     967              : 
     968              : // addObsoleteLocked will add the fileInfo associated with obsolete backing
     969              : // sstables to the obsolete tables list.
     970              : //
     971              : // The file backings in the obsolete list must not appear more than once.
     972              : //
     973              : // DB.mu must be held when addObsoleteLocked is called.
     974            1 : func (vs *versionSet) addObsoleteLocked(obsolete manifest.ObsoleteFiles) {
     975            1 :         if obsolete.Count() == 0 {
     976            1 :                 return
     977            1 :         }
     978              : 
     979              :         // Note that the zombie objects transition from zombie *to* obsolete, and
     980              :         // will no longer be considered zombie.
     981              : 
     982            1 :         newlyObsoleteTables := make([]deletepacer.ObsoleteFile, len(obsolete.TableBackings))
     983            1 :         for i, bs := range obsolete.TableBackings {
     984            1 :                 newlyObsoleteTables[i] = vs.zombieTables.Extract(bs.DiskFileNum).
     985            1 :                         asObsoleteFile(vs.fs, base.FileTypeTable, vs.dirname)
     986            1 :         }
     987            1 :         vs.obsoleteTables = mergeObsoleteFiles(vs.obsoleteTables, newlyObsoleteTables)
     988            1 : 
     989            1 :         newlyObsoleteBlobFiles := make([]deletepacer.ObsoleteFile, len(obsolete.BlobFiles))
     990            1 :         for i, bf := range obsolete.BlobFiles {
     991            1 :                 newlyObsoleteBlobFiles[i] = vs.zombieBlobs.Extract(bf.FileNum).
     992            1 :                         asObsoleteFile(vs.fs, base.FileTypeBlob, vs.dirname)
     993            1 :         }
     994            1 :         vs.obsoleteBlobs = mergeObsoleteFiles(vs.obsoleteBlobs, newlyObsoleteBlobFiles)
     995              : }
     996              : 
     997              : // addObsolete will acquire DB.mu, so DB.mu must not be held when this is
     998              : // called.
     999            1 : func (vs *versionSet) addObsolete(obsolete manifest.ObsoleteFiles) {
    1000            1 :         vs.mu.Lock()
    1001            1 :         defer vs.mu.Unlock()
    1002            1 :         vs.addObsoleteLocked(obsolete)
    1003            1 : }
    1004              : 
    1005              : // This method sets the following fields of m.Levels[*]:
    1006              : //   - [Virtual]Tables{Count,Size}
    1007              : //   - Sublevels
    1008              : //   - EstimatedReferencesSize
    1009            1 : func setBasicLevelMetrics(lm *AllLevelMetrics, newVersion *manifest.Version) {
    1010            1 :         for i := range lm {
    1011            1 :                 l := &lm[i]
    1012            1 :                 l.Tables.Count = uint64(newVersion.Levels[i].Len())
    1013            1 :                 l.Tables.Bytes = newVersion.Levels[i].TableSize()
    1014            1 :                 l.VirtualTables = newVersion.Levels[i].VirtualTables()
    1015            1 :                 l.EstimatedReferencesSize = newVersion.Levels[i].EstimatedReferenceSize()
    1016            1 :                 l.Sublevels = 0
    1017            1 :                 if l.Tables.Count > 0 {
    1018            1 :                         l.Sublevels = 1
    1019            1 :                 }
    1020            1 :                 if invariants.Enabled {
    1021            1 :                         levelFiles := newVersion.Levels[i].Slice()
    1022            1 :                         if size := levelFiles.TableSizeSum(); l.Tables.Bytes != size {
    1023            0 :                                 panic(fmt.Sprintf("versionSet metrics L%d Size = %d, actual size = %d", i, l.Tables.Bytes, size))
    1024              :                         }
    1025            1 :                         refSize := uint64(0)
    1026            1 :                         for f := range levelFiles.All() {
    1027            1 :                                 refSize += f.EstimatedReferenceSize()
    1028            1 :                         }
    1029            1 :                         if refSize != l.EstimatedReferencesSize {
    1030            0 :                                 panic(fmt.Sprintf(
    1031            0 :                                         "versionSet metrics L%d EstimatedReferencesSize = %d, recomputed size = %d",
    1032            0 :                                         i, l.EstimatedReferencesSize, refSize,
    1033            0 :                                 ))
    1034              :                         }
    1035              : 
    1036            1 :                         if nVirtual := levelFiles.NumVirtual(); nVirtual != l.VirtualTables.Count {
    1037            0 :                                 panic(fmt.Sprintf(
    1038            0 :                                         "versionSet metrics L%d NumVirtual = %d, actual NumVirtual = %d",
    1039            0 :                                         i, l.VirtualTables.Count, nVirtual,
    1040            0 :                                 ))
    1041              :                         }
    1042            1 :                         if vSize := levelFiles.VirtualTableSizeSum(); vSize != l.VirtualTables.Bytes {
    1043            0 :                                 panic(fmt.Sprintf(
    1044            0 :                                         "versionSet metrics L%d Virtual size = %d, actual size = %d",
    1045            0 :                                         i, l.VirtualTables.Bytes, vSize,
    1046            0 :                                 ))
    1047              :                         }
    1048              :                 }
    1049              :         }
    1050            1 :         lm[0].Sublevels = int32(len(newVersion.L0SublevelFiles))
    1051              : }
    1052              : 
    1053              : func findCurrentManifest(
    1054              :         fs vfs.FS, dirname string, ls []string,
    1055            1 : ) (marker *atomicfs.Marker, manifestNum base.DiskFileNum, exists bool, err error) {
    1056            1 :         // Locating a marker should succeed even if the marker has never been placed.
    1057            1 :         var filename string
    1058            1 :         marker, filename, err = atomicfs.LocateMarkerInListing(fs, dirname, manifestMarkerName, ls)
    1059            1 :         if err != nil {
    1060            0 :                 return nil, 0, false, err
    1061            0 :         }
    1062              : 
    1063            1 :         if filename == "" {
    1064            1 :                 // The marker hasn't been set yet. This database doesn't exist.
    1065            1 :                 return marker, 0, false, nil
    1066            1 :         }
    1067              : 
    1068            1 :         var ok bool
    1069            1 :         _, manifestNum, ok = base.ParseFilename(fs, filename)
    1070            1 :         if !ok {
    1071            0 :                 return marker, 0, false, base.CorruptionErrorf("pebble: MANIFEST name %q is malformed", errors.Safe(filename))
    1072            0 :         }
    1073            1 :         return marker, manifestNum, true, nil
    1074              : }
        

Generated by: LCOV version 2.0-1