LCOV - code coverage report
Current view: top level - pebble/internal/manifest - version_edit.go (source / functions) Hit Total Coverage
Test: 2023-11-10 08:16Z 96a8bc2d - tests + meta.lcov Lines: 568 724 78.5 %
Date: 2023-11-10 08:16:45 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package manifest
       6             : 
       7             : import (
       8             :         "bufio"
       9             :         "bytes"
      10             :         stdcmp "cmp"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "slices"
      15             :         "time"
      16             : 
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/invariants"
      20             : )
      21             : 
      22             : // TODO(peter): describe the MANIFEST file format, independently of the C++
      23             : // project.
      24             : 
      25             : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
      26             : 
      27             : type byteReader interface {
      28             :         io.ByteReader
      29             :         io.Reader
      30             : }
      31             : 
      32             : // Tags for the versionEdit disk format.
      33             : // Tag 8 is no longer used.
      34             : const (
      35             :         // LevelDB tags.
      36             :         tagComparator     = 1
      37             :         tagLogNumber      = 2
      38             :         tagNextFileNumber = 3
      39             :         tagLastSequence   = 4
      40             :         tagCompactPointer = 5
      41             :         tagDeletedFile    = 6
      42             :         tagNewFile        = 7
      43             :         tagPrevLogNumber  = 9
      44             : 
      45             :         // RocksDB tags.
      46             :         tagNewFile2         = 100
      47             :         tagNewFile3         = 102
      48             :         tagNewFile4         = 103
      49             :         tagColumnFamily     = 200
      50             :         tagColumnFamilyAdd  = 201
      51             :         tagColumnFamilyDrop = 202
      52             :         tagMaxColumnFamily  = 203
      53             : 
      54             :         // Pebble tags.
      55             :         tagNewFile5            = 104 // Range keys.
      56             :         tagCreatedBackingTable = 105
      57             :         tagRemovedBackingTable = 106
      58             : 
      59             :         // The custom tags sub-format used by tagNewFile4 and above.
      60             :         customTagTerminate         = 1
      61             :         customTagNeedsCompaction   = 2
      62             :         customTagCreationTime      = 6
      63             :         customTagPathID            = 65
      64             :         customTagNonSafeIgnoreMask = 1 << 6
      65             :         customTagVirtual           = 66
      66             : )
      67             : 
      68             : // DeletedFileEntry holds the state for a file deletion from a level. The file
      69             : // itself might still be referenced by another level.
      70             : type DeletedFileEntry struct {
      71             :         Level   int
      72             :         FileNum base.FileNum
      73             : }
      74             : 
      75             : // NewFileEntry holds the state for a new file or one moved from a different
      76             : // level.
      77             : type NewFileEntry struct {
      78             :         Level int
      79             :         Meta  *FileMetadata
      80             :         // BackingFileNum is only set during manifest replay, and only for virtual
      81             :         // sstables.
      82             :         BackingFileNum base.DiskFileNum
      83             : }
      84             : 
      85             : // VersionEdit holds the state for an edit to a Version along with other
      86             : // on-disk state (log numbers, next file number, and the last sequence number).
      87             : type VersionEdit struct {
      88             :         // ComparerName is the value of Options.Comparer.Name. This is only set in
      89             :         // the first VersionEdit in a manifest (either when the DB is created, or
      90             :         // when a new manifest is created) and is used to verify that the comparer
      91             :         // specified at Open matches the comparer that was previously used.
      92             :         ComparerName string
      93             : 
      94             :         // MinUnflushedLogNum is the smallest WAL log file number corresponding to
      95             :         // mutations that have not been flushed to an sstable.
      96             :         //
      97             :         // This is an optional field, and 0 represents it is not set.
      98             :         MinUnflushedLogNum base.DiskFileNum
      99             : 
     100             :         // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
     101             :         // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
     102             :         // 6/2011. We keep it around purely for informational purposes when
     103             :         // displaying MANIFEST contents.
     104             :         ObsoletePrevLogNum uint64
     105             : 
     106             :         // The next file number. A single counter is used to assign file numbers
     107             :         // for the WAL, MANIFEST, sstable, and OPTIONS files.
     108             :         NextFileNum uint64
     109             : 
     110             :         // LastSeqNum is an upper bound on the sequence numbers that have been
     111             :         // assigned in flushed WALs. Unflushed WALs (that will be replayed during
     112             :         // recovery) may contain sequence numbers greater than this value.
     113             :         LastSeqNum uint64
     114             : 
     115             :         // A file num may be present in both deleted files and new files when it
     116             :         // is moved from a lower level to a higher level (when the compaction
     117             :         // found that there was no overlapping file at the higher level).
     118             :         DeletedFiles map[DeletedFileEntry]*FileMetadata
     119             :         NewFiles     []NewFileEntry
     120             :         // CreatedBackingTables can be used to preserve the FileBacking associated
     121             :         // with a physical sstable. This is useful when virtual sstables in the
     122             :         // latest version are reconstructed during manifest replay, and we also need
     123             :         // to reconstruct the FileBacking which is required by these virtual
     124             :         // sstables.
     125             :         //
     126             :         // INVARIANT: The FileBacking associated with a physical sstable must only
     127             :         // be added as a backing file in the same version edit where the physical
     128             :         // sstable is first virtualized. This means that the physical sstable must
     129             :         // be present in DeletedFiles and that there must be at least one virtual
     130             :         // sstable with the same FileBacking as the physical sstable in NewFiles. A
     131             :         // file must be present in CreatedBackingTables in exactly one version edit.
     132             :         // The physical sstable associated with the FileBacking must also not be
     133             :         // present in NewFiles.
     134             :         CreatedBackingTables []*FileBacking
     135             :         // RemovedBackingTables is used to remove the FileBacking associated with a
     136             :         // virtual sstable. Note that a backing sstable can be removed as soon as
     137             :         // there are no virtual sstables in the latest version which are using the
     138             :         // backing sstable, but the backing sstable doesn't necessarily have to be
     139             :         // removed atomically with the version edit which removes the last virtual
     140             :         // sstable associated with the backing sstable. The removal can happen in a
     141             :         // future version edit.
     142             :         //
     143             :         // INVARIANT: A file must only be added to RemovedBackingTables if it was
     144             :         // added to CreateBackingTables in a prior version edit. The same version
     145             :         // edit also cannot have the same file present in both CreateBackingTables
     146             :         // and RemovedBackingTables. A file must be present in RemovedBackingTables
     147             :         // in exactly one version edit.
     148             :         RemovedBackingTables []base.DiskFileNum
     149             : }
     150             : 
     151             : // Decode decodes an edit from the specified reader.
     152             : //
     153             : // Note that the Decode step will not set the FileBacking for virtual sstables
     154             : // and the responsibility is left to the caller. However, the Decode step will
     155             : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
     156           2 : func (v *VersionEdit) Decode(r io.Reader) error {
     157           2 :         br, ok := r.(byteReader)
     158           2 :         if !ok {
     159           2 :                 br = bufio.NewReader(r)
     160           2 :         }
     161           2 :         d := versionEditDecoder{br}
     162           2 :         for {
     163           2 :                 tag, err := binary.ReadUvarint(br)
     164           2 :                 if err == io.EOF {
     165           2 :                         break
     166             :                 }
     167           2 :                 if err != nil {
     168           0 :                         return err
     169           0 :                 }
     170           2 :                 switch tag {
     171           2 :                 case tagComparator:
     172           2 :                         s, err := d.readBytes()
     173           2 :                         if err != nil {
     174           0 :                                 return err
     175           0 :                         }
     176           2 :                         v.ComparerName = string(s)
     177             : 
     178           2 :                 case tagLogNumber:
     179           2 :                         n, err := d.readUvarint()
     180           2 :                         if err != nil {
     181           0 :                                 return err
     182           0 :                         }
     183           2 :                         v.MinUnflushedLogNum = base.DiskFileNum(n)
     184             : 
     185           2 :                 case tagNextFileNumber:
     186           2 :                         n, err := d.readUvarint()
     187           2 :                         if err != nil {
     188           0 :                                 return err
     189           0 :                         }
     190           2 :                         v.NextFileNum = n
     191             : 
     192           2 :                 case tagLastSequence:
     193           2 :                         n, err := d.readUvarint()
     194           2 :                         if err != nil {
     195           0 :                                 return err
     196           0 :                         }
     197           2 :                         v.LastSeqNum = n
     198             : 
     199           0 :                 case tagCompactPointer:
     200           0 :                         if _, err := d.readLevel(); err != nil {
     201           0 :                                 return err
     202           0 :                         }
     203           0 :                         if _, err := d.readBytes(); err != nil {
     204           0 :                                 return err
     205           0 :                         }
     206             :                         // NB: RocksDB does not use compaction pointers anymore.
     207             : 
     208           2 :                 case tagRemovedBackingTable:
     209           2 :                         n, err := d.readUvarint()
     210           2 :                         if err != nil {
     211           0 :                                 return err
     212           0 :                         }
     213           2 :                         v.RemovedBackingTables = append(
     214           2 :                                 v.RemovedBackingTables, base.FileNum(n).DiskFileNum(),
     215           2 :                         )
     216           2 :                 case tagCreatedBackingTable:
     217           2 :                         dfn, err := d.readUvarint()
     218           2 :                         if err != nil {
     219           0 :                                 return err
     220           0 :                         }
     221           2 :                         size, err := d.readUvarint()
     222           2 :                         if err != nil {
     223           0 :                                 return err
     224           0 :                         }
     225           2 :                         fileBacking := &FileBacking{
     226           2 :                                 DiskFileNum: base.FileNum(dfn).DiskFileNum(),
     227           2 :                                 Size:        size,
     228           2 :                         }
     229           2 :                         v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
     230           2 :                 case tagDeletedFile:
     231           2 :                         level, err := d.readLevel()
     232           2 :                         if err != nil {
     233           0 :                                 return err
     234           0 :                         }
     235           2 :                         fileNum, err := d.readFileNum()
     236           2 :                         if err != nil {
     237           0 :                                 return err
     238           0 :                         }
     239           2 :                         if v.DeletedFiles == nil {
     240           2 :                                 v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
     241           2 :                         }
     242           2 :                         v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
     243             : 
     244           2 :                 case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
     245           2 :                         level, err := d.readLevel()
     246           2 :                         if err != nil {
     247           0 :                                 return err
     248           0 :                         }
     249           2 :                         fileNum, err := d.readFileNum()
     250           2 :                         if err != nil {
     251           0 :                                 return err
     252           0 :                         }
     253           2 :                         if tag == tagNewFile3 {
     254           0 :                                 // The pathID field appears unused in RocksDB.
     255           0 :                                 _ /* pathID */, err := d.readUvarint()
     256           0 :                                 if err != nil {
     257           0 :                                         return err
     258           0 :                                 }
     259             :                         }
     260           2 :                         size, err := d.readUvarint()
     261           2 :                         if err != nil {
     262           0 :                                 return err
     263           0 :                         }
     264             :                         // We read the smallest / largest key bounds differently depending on
     265             :                         // whether we have point, range or both types of keys present in the
     266             :                         // table.
     267           2 :                         var (
     268           2 :                                 smallestPointKey, largestPointKey []byte
     269           2 :                                 smallestRangeKey, largestRangeKey []byte
     270           2 :                                 parsedPointBounds                 bool
     271           2 :                                 boundsMarker                      byte
     272           2 :                         )
     273           2 :                         if tag != tagNewFile5 {
     274           2 :                                 // Range keys not present in the table. Parse the point key bounds.
     275           2 :                                 smallestPointKey, err = d.readBytes()
     276           2 :                                 if err != nil {
     277           0 :                                         return err
     278           0 :                                 }
     279           2 :                                 largestPointKey, err = d.readBytes()
     280           2 :                                 if err != nil {
     281           0 :                                         return err
     282           0 :                                 }
     283           2 :                         } else {
     284           2 :                                 // Range keys are present in the table. Determine whether we have point
     285           2 :                                 // keys to parse, in addition to the bounds.
     286           2 :                                 boundsMarker, err = d.ReadByte()
     287           2 :                                 if err != nil {
     288           0 :                                         return err
     289           0 :                                 }
     290             :                                 // Parse point key bounds, if present.
     291           2 :                                 if boundsMarker&maskContainsPointKeys > 0 {
     292           2 :                                         smallestPointKey, err = d.readBytes()
     293           2 :                                         if err != nil {
     294           0 :                                                 return err
     295           0 :                                         }
     296           2 :                                         largestPointKey, err = d.readBytes()
     297           2 :                                         if err != nil {
     298           0 :                                                 return err
     299           0 :                                         }
     300           2 :                                         parsedPointBounds = true
     301           2 :                                 } else {
     302           2 :                                         // The table does not have point keys.
     303           2 :                                         // Sanity check: the bounds must be range keys.
     304           2 :                                         if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
     305           0 :                                                 return base.CorruptionErrorf(
     306           0 :                                                         "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
     307           0 :                                                         boundsMarker,
     308           0 :                                                 )
     309           0 :                                         }
     310             :                                 }
     311             :                                 // Parse range key bounds.
     312           2 :                                 smallestRangeKey, err = d.readBytes()
     313           2 :                                 if err != nil {
     314           0 :                                         return err
     315           0 :                                 }
     316           2 :                                 largestRangeKey, err = d.readBytes()
     317           2 :                                 if err != nil {
     318           0 :                                         return err
     319           0 :                                 }
     320             :                         }
     321           2 :                         var smallestSeqNum uint64
     322           2 :                         var largestSeqNum uint64
     323           2 :                         if tag != tagNewFile {
     324           2 :                                 smallestSeqNum, err = d.readUvarint()
     325           2 :                                 if err != nil {
     326           0 :                                         return err
     327           0 :                                 }
     328           2 :                                 largestSeqNum, err = d.readUvarint()
     329           2 :                                 if err != nil {
     330           0 :                                         return err
     331           0 :                                 }
     332             :                         }
     333           2 :                         var markedForCompaction bool
     334           2 :                         var creationTime uint64
     335           2 :                         virtualState := struct {
     336           2 :                                 virtual        bool
     337           2 :                                 backingFileNum uint64
     338           2 :                         }{}
     339           2 :                         if tag == tagNewFile4 || tag == tagNewFile5 {
     340           2 :                                 for {
     341           2 :                                         customTag, err := d.readUvarint()
     342           2 :                                         if err != nil {
     343           0 :                                                 return err
     344           0 :                                         }
     345           2 :                                         if customTag == customTagTerminate {
     346           2 :                                                 break
     347           2 :                                         } else if customTag == customTagVirtual {
     348           2 :                                                 virtualState.virtual = true
     349           2 :                                                 n, err := d.readUvarint()
     350           2 :                                                 if err != nil {
     351           0 :                                                         return err
     352           0 :                                                 }
     353           2 :                                                 virtualState.backingFileNum = n
     354           2 :                                                 continue
     355             :                                         }
     356             : 
     357           2 :                                         field, err := d.readBytes()
     358           2 :                                         if err != nil {
     359           0 :                                                 return err
     360           0 :                                         }
     361           2 :                                         switch customTag {
     362           1 :                                         case customTagNeedsCompaction:
     363           1 :                                                 if len(field) != 1 {
     364           0 :                                                         return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
     365           0 :                                                 }
     366           1 :                                                 markedForCompaction = (field[0] == 1)
     367             : 
     368           2 :                                         case customTagCreationTime:
     369           2 :                                                 var n int
     370           2 :                                                 creationTime, n = binary.Uvarint(field)
     371           2 :                                                 if n != len(field) {
     372           0 :                                                         return base.CorruptionErrorf("new-file4: invalid file creation time")
     373           0 :                                                 }
     374             : 
     375           0 :                                         case customTagPathID:
     376           0 :                                                 return base.CorruptionErrorf("new-file4: path-id field not supported")
     377             : 
     378           0 :                                         default:
     379           0 :                                                 if (customTag & customTagNonSafeIgnoreMask) != 0 {
     380           0 :                                                         return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
     381           0 :                                                 }
     382             :                                         }
     383             :                                 }
     384             :                         }
     385           2 :                         m := &FileMetadata{
     386           2 :                                 FileNum:             fileNum,
     387           2 :                                 Size:                size,
     388           2 :                                 CreationTime:        int64(creationTime),
     389           2 :                                 SmallestSeqNum:      smallestSeqNum,
     390           2 :                                 LargestSeqNum:       largestSeqNum,
     391           2 :                                 MarkedForCompaction: markedForCompaction,
     392           2 :                                 Virtual:             virtualState.virtual,
     393           2 :                         }
     394           2 :                         if tag != tagNewFile5 { // no range keys present
     395           2 :                                 m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     396           2 :                                 m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     397           2 :                                 m.HasPointKeys = true
     398           2 :                                 m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
     399           2 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
     400           2 :                         } else { // range keys present
     401           2 :                                 // Set point key bounds, if parsed.
     402           2 :                                 if parsedPointBounds {
     403           2 :                                         m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     404           2 :                                         m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     405           2 :                                         m.HasPointKeys = true
     406           2 :                                 }
     407             :                                 // Set range key bounds.
     408           2 :                                 m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
     409           2 :                                 m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
     410           2 :                                 m.HasRangeKeys = true
     411           2 :                                 // Set overall bounds (by default assume range keys).
     412           2 :                                 m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
     413           2 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
     414           2 :                                 if boundsMarker&maskSmallest == maskSmallest {
     415           2 :                                         m.Smallest = m.SmallestPointKey
     416           2 :                                         m.boundTypeSmallest = boundTypePointKey
     417           2 :                                 }
     418           2 :                                 if boundsMarker&maskLargest == maskLargest {
     419           2 :                                         m.Largest = m.LargestPointKey
     420           2 :                                         m.boundTypeLargest = boundTypePointKey
     421           2 :                                 }
     422             :                         }
     423           2 :                         m.boundsSet = true
     424           2 :                         if !virtualState.virtual {
     425           2 :                                 m.InitPhysicalBacking()
     426           2 :                         }
     427             : 
     428           2 :                         nfe := NewFileEntry{
     429           2 :                                 Level: level,
     430           2 :                                 Meta:  m,
     431           2 :                         }
     432           2 :                         if virtualState.virtual {
     433           2 :                                 nfe.BackingFileNum = base.FileNum(virtualState.backingFileNum).DiskFileNum()
     434           2 :                         }
     435           2 :                         v.NewFiles = append(v.NewFiles, nfe)
     436             : 
     437           1 :                 case tagPrevLogNumber:
     438           1 :                         n, err := d.readUvarint()
     439           1 :                         if err != nil {
     440           0 :                                 return err
     441           0 :                         }
     442           1 :                         v.ObsoletePrevLogNum = n
     443             : 
     444           0 :                 case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
     445           0 :                         return base.CorruptionErrorf("column families are not supported")
     446             : 
     447           0 :                 default:
     448           0 :                         return errCorruptManifest
     449             :                 }
     450             :         }
     451           2 :         return nil
     452             : }
     453             : 
     454           1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
     455           1 :         var buf bytes.Buffer
     456           1 :         if v.ComparerName != "" {
     457           1 :                 fmt.Fprintf(&buf, "  comparer:     %s", v.ComparerName)
     458           1 :         }
     459           1 :         if v.MinUnflushedLogNum != 0 {
     460           1 :                 fmt.Fprintf(&buf, "  log-num:       %d\n", v.MinUnflushedLogNum)
     461           1 :         }
     462           1 :         if v.ObsoletePrevLogNum != 0 {
     463           0 :                 fmt.Fprintf(&buf, "  prev-log-num:  %d\n", v.ObsoletePrevLogNum)
     464           0 :         }
     465           1 :         if v.NextFileNum != 0 {
     466           1 :                 fmt.Fprintf(&buf, "  next-file-num: %d\n", v.NextFileNum)
     467           1 :         }
     468           1 :         if v.LastSeqNum != 0 {
     469           1 :                 fmt.Fprintf(&buf, "  last-seq-num:  %d\n", v.LastSeqNum)
     470           1 :         }
     471           1 :         entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
     472           1 :         for df := range v.DeletedFiles {
     473           1 :                 entries = append(entries, df)
     474           1 :         }
     475           1 :         slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
     476           1 :                 if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
     477           1 :                         return v
     478           1 :                 }
     479           0 :                 return stdcmp.Compare(a.FileNum, b.FileNum)
     480             :         })
     481           1 :         for _, df := range entries {
     482           1 :                 fmt.Fprintf(&buf, "  deleted:       L%d %s\n", df.Level, df.FileNum)
     483           1 :         }
     484           1 :         for _, nf := range v.NewFiles {
     485           1 :                 fmt.Fprintf(&buf, "  added:         L%d", nf.Level)
     486           1 :                 if verbose {
     487           1 :                         fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, true /* verbose */))
     488           1 :                 } else {
     489           1 :                         fmt.Fprintf(&buf, " %s", nf.Meta.String())
     490           1 :                 }
     491           1 :                 if nf.Meta.CreationTime != 0 {
     492           1 :                         fmt.Fprintf(&buf, " (%s)",
     493           1 :                                 time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
     494           1 :                 }
     495           1 :                 fmt.Fprintln(&buf)
     496             :         }
     497           1 :         return buf.String()
     498             : }
     499             : 
     500             : // DebugString is a more verbose version of String(). Use this in tests.
     501           1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
     502           1 :         return v.string(true /* verbose */, fmtKey)
     503           1 : }
     504             : 
     505             : // String implements fmt.Stringer for a VersionEdit.
     506           1 : func (v *VersionEdit) String() string {
     507           1 :         return v.string(false /* verbose */, base.DefaultFormatter)
     508           1 : }
     509             : 
     510             : // Encode encodes an edit to the specified writer.
     511           2 : func (v *VersionEdit) Encode(w io.Writer) error {
     512           2 :         e := versionEditEncoder{new(bytes.Buffer)}
     513           2 : 
     514           2 :         if v.ComparerName != "" {
     515           2 :                 e.writeUvarint(tagComparator)
     516           2 :                 e.writeString(v.ComparerName)
     517           2 :         }
     518           2 :         if v.MinUnflushedLogNum != 0 {
     519           2 :                 e.writeUvarint(tagLogNumber)
     520           2 :                 e.writeUvarint(uint64(v.MinUnflushedLogNum))
     521           2 :         }
     522           2 :         if v.ObsoletePrevLogNum != 0 {
     523           1 :                 e.writeUvarint(tagPrevLogNumber)
     524           1 :                 e.writeUvarint(v.ObsoletePrevLogNum)
     525           1 :         }
     526           2 :         if v.NextFileNum != 0 {
     527           2 :                 e.writeUvarint(tagNextFileNumber)
     528           2 :                 e.writeUvarint(uint64(v.NextFileNum))
     529           2 :         }
     530           2 :         for _, dfn := range v.RemovedBackingTables {
     531           2 :                 e.writeUvarint(tagRemovedBackingTable)
     532           2 :                 e.writeUvarint(uint64(dfn.FileNum()))
     533           2 :         }
     534           2 :         for _, fileBacking := range v.CreatedBackingTables {
     535           2 :                 e.writeUvarint(tagCreatedBackingTable)
     536           2 :                 e.writeUvarint(uint64(fileBacking.DiskFileNum.FileNum()))
     537           2 :                 e.writeUvarint(fileBacking.Size)
     538           2 :         }
     539             :         // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
     540             :         // even though its value is zero. We detect this by encoding LastSeqNum when
     541             :         // ComparerName is set.
     542           2 :         if v.LastSeqNum != 0 || v.ComparerName != "" {
     543           2 :                 e.writeUvarint(tagLastSequence)
     544           2 :                 e.writeUvarint(v.LastSeqNum)
     545           2 :         }
     546           2 :         for x := range v.DeletedFiles {
     547           2 :                 e.writeUvarint(tagDeletedFile)
     548           2 :                 e.writeUvarint(uint64(x.Level))
     549           2 :                 e.writeUvarint(uint64(x.FileNum))
     550           2 :         }
     551           2 :         for _, x := range v.NewFiles {
     552           2 :                 customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
     553           2 :                 var tag uint64
     554           2 :                 switch {
     555           2 :                 case x.Meta.HasRangeKeys:
     556           2 :                         tag = tagNewFile5
     557           2 :                 case customFields:
     558           2 :                         tag = tagNewFile4
     559           1 :                 default:
     560           1 :                         tag = tagNewFile2
     561             :                 }
     562           2 :                 e.writeUvarint(tag)
     563           2 :                 e.writeUvarint(uint64(x.Level))
     564           2 :                 e.writeUvarint(uint64(x.Meta.FileNum))
     565           2 :                 e.writeUvarint(x.Meta.Size)
     566           2 :                 if !x.Meta.HasRangeKeys {
     567           2 :                         // If we have no range keys, preserve the original format and write the
     568           2 :                         // smallest and largest point keys.
     569           2 :                         e.writeKey(x.Meta.SmallestPointKey)
     570           2 :                         e.writeKey(x.Meta.LargestPointKey)
     571           2 :                 } else {
     572           2 :                         // When range keys are present, we first write a marker byte that
     573           2 :                         // indicates if the table also contains point keys, in addition to how the
     574           2 :                         // overall bounds for the table should be reconstructed. This byte is
     575           2 :                         // followed by the keys themselves.
     576           2 :                         b, err := x.Meta.boundsMarker()
     577           2 :                         if err != nil {
     578           0 :                                 return err
     579           0 :                         }
     580           2 :                         if err = e.WriteByte(b); err != nil {
     581           0 :                                 return err
     582           0 :                         }
     583             :                         // Write point key bounds (if present).
     584           2 :                         if x.Meta.HasPointKeys {
     585           2 :                                 e.writeKey(x.Meta.SmallestPointKey)
     586           2 :                                 e.writeKey(x.Meta.LargestPointKey)
     587           2 :                         }
     588             :                         // Write range key bounds.
     589           2 :                         e.writeKey(x.Meta.SmallestRangeKey)
     590           2 :                         e.writeKey(x.Meta.LargestRangeKey)
     591             :                 }
     592           2 :                 e.writeUvarint(x.Meta.SmallestSeqNum)
     593           2 :                 e.writeUvarint(x.Meta.LargestSeqNum)
     594           2 :                 if customFields {
     595           2 :                         if x.Meta.CreationTime != 0 {
     596           2 :                                 e.writeUvarint(customTagCreationTime)
     597           2 :                                 var buf [binary.MaxVarintLen64]byte
     598           2 :                                 n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
     599           2 :                                 e.writeBytes(buf[:n])
     600           2 :                         }
     601           2 :                         if x.Meta.MarkedForCompaction {
     602           1 :                                 e.writeUvarint(customTagNeedsCompaction)
     603           1 :                                 e.writeBytes([]byte{1})
     604           1 :                         }
     605           2 :                         if x.Meta.Virtual {
     606           2 :                                 e.writeUvarint(customTagVirtual)
     607           2 :                                 e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum.FileNum()))
     608           2 :                         }
     609           2 :                         e.writeUvarint(customTagTerminate)
     610             :                 }
     611             :         }
     612           2 :         _, err := w.Write(e.Bytes())
     613           2 :         return err
     614             : }
     615             : 
     616             : // versionEditDecoder should be used to decode version edits.
     617             : type versionEditDecoder struct {
     618             :         byteReader
     619             : }
     620             : 
     621           2 : func (d versionEditDecoder) readBytes() ([]byte, error) {
     622           2 :         n, err := d.readUvarint()
     623           2 :         if err != nil {
     624           0 :                 return nil, err
     625           0 :         }
     626           2 :         s := make([]byte, n)
     627           2 :         _, err = io.ReadFull(d, s)
     628           2 :         if err != nil {
     629           0 :                 if err == io.ErrUnexpectedEOF {
     630           0 :                         return nil, errCorruptManifest
     631           0 :                 }
     632           0 :                 return nil, err
     633             :         }
     634           2 :         return s, nil
     635             : }
     636             : 
     637           2 : func (d versionEditDecoder) readLevel() (int, error) {
     638           2 :         u, err := d.readUvarint()
     639           2 :         if err != nil {
     640           0 :                 return 0, err
     641           0 :         }
     642           2 :         if u >= NumLevels {
     643           0 :                 return 0, errCorruptManifest
     644           0 :         }
     645           2 :         return int(u), nil
     646             : }
     647             : 
     648           2 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
     649           2 :         u, err := d.readUvarint()
     650           2 :         if err != nil {
     651           0 :                 return 0, err
     652           0 :         }
     653           2 :         return base.FileNum(u), nil
     654             : }
     655             : 
     656           2 : func (d versionEditDecoder) readUvarint() (uint64, error) {
     657           2 :         u, err := binary.ReadUvarint(d)
     658           2 :         if err != nil {
     659           0 :                 if err == io.EOF {
     660           0 :                         return 0, errCorruptManifest
     661           0 :                 }
     662           0 :                 return 0, err
     663             :         }
     664           2 :         return u, nil
     665             : }
     666             : 
     667             : type versionEditEncoder struct {
     668             :         *bytes.Buffer
     669             : }
     670             : 
     671           2 : func (e versionEditEncoder) writeBytes(p []byte) {
     672           2 :         e.writeUvarint(uint64(len(p)))
     673           2 :         e.Write(p)
     674           2 : }
     675             : 
     676           2 : func (e versionEditEncoder) writeKey(k InternalKey) {
     677           2 :         e.writeUvarint(uint64(k.Size()))
     678           2 :         e.Write(k.UserKey)
     679           2 :         buf := k.EncodeTrailer()
     680           2 :         e.Write(buf[:])
     681           2 : }
     682             : 
     683           2 : func (e versionEditEncoder) writeString(s string) {
     684           2 :         e.writeUvarint(uint64(len(s)))
     685           2 :         e.WriteString(s)
     686           2 : }
     687             : 
     688           2 : func (e versionEditEncoder) writeUvarint(u uint64) {
     689           2 :         var buf [binary.MaxVarintLen64]byte
     690           2 :         n := binary.PutUvarint(buf[:], u)
     691           2 :         e.Write(buf[:n])
     692           2 : }
     693             : 
     694             : // BulkVersionEdit summarizes the files added and deleted from a set of version
     695             : // edits.
     696             : //
     697             : // INVARIANTS:
     698             : // No file can be added to a level more than once. This is true globally, and
     699             : // also true for all of the calls to Accumulate for a single bulk version edit.
     700             : //
     701             : // No file can be removed from a level more than once. This is true globally,
     702             : // and also true for all of the calls to Accumulate for a single bulk version
     703             : // edit.
     704             : //
     705             : // A file must not be added and removed from a given level in the same version
     706             : // edit.
     707             : //
     708             : // A file that is being removed from a level must have been added to that level
     709             : // before (in a prior version edit). Note that a given file can be deleted from
     710             : // a level and added to another level in a single version edit
     711             : type BulkVersionEdit struct {
     712             :         Added   [NumLevels]map[base.FileNum]*FileMetadata
     713             :         Deleted [NumLevels]map[base.FileNum]*FileMetadata
     714             : 
     715             :         // AddedFileBacking is a map to support lookup so that we can populate the
     716             :         // FileBacking of virtual sstables during manifest replay.
     717             :         AddedFileBacking   map[base.DiskFileNum]*FileBacking
     718             :         RemovedFileBacking []base.DiskFileNum
     719             : 
     720             :         // AddedByFileNum maps file number to file metadata for all added files
     721             :         // from accumulated version edits. AddedByFileNum is only populated if set
     722             :         // to non-nil by a caller. It must be set to non-nil when replaying
     723             :         // version edits read from a MANIFEST (as opposed to VersionEdits
     724             :         // constructed in-memory).  While replaying a MANIFEST file,
     725             :         // VersionEdit.DeletedFiles map entries have nil values, because the
     726             :         // on-disk deletion record encodes only the file number. Accumulate
     727             :         // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
     728             :         // field with non-nil *FileMetadata.
     729             :         AddedByFileNum map[base.FileNum]*FileMetadata
     730             : 
     731             :         // MarkedForCompactionCountDiff holds the aggregated count of files
     732             :         // marked for compaction added or removed.
     733             :         MarkedForCompactionCountDiff int
     734             : }
     735             : 
     736             : // Accumulate adds the file addition and deletions in the specified version
     737             : // edit to the bulk edit's internal state.
     738             : //
     739             : // INVARIANTS:
     740             : // If a file is added to a given level in a call to Accumulate and then removed
     741             : // from that level in a subsequent call, the file will not be present in the
     742             : // resulting BulkVersionEdit.Deleted for that level.
     743             : //
     744             : // After accumulation of version edits, the bulk version edit may have
     745             : // information about a file which has been deleted from a level, but it may
     746             : // not have information about the same file added to the same level. The add
     747             : // could've occurred as part of a previous bulk version edit. In this case,
     748             : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
     749             : // of the accumulation, because we need to decrease the refcount of the
     750             : // deleted file in Apply.
     751           2 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
     752           2 :         for df, m := range ve.DeletedFiles {
     753           2 :                 dmap := b.Deleted[df.Level]
     754           2 :                 if dmap == nil {
     755           2 :                         dmap = make(map[base.FileNum]*FileMetadata)
     756           2 :                         b.Deleted[df.Level] = dmap
     757           2 :                 }
     758             : 
     759           2 :                 if m == nil {
     760           2 :                         // m is nil only when replaying a MANIFEST.
     761           2 :                         if b.AddedByFileNum == nil {
     762           0 :                                 return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
     763           0 :                         }
     764           2 :                         m = b.AddedByFileNum[df.FileNum]
     765           2 :                         if m == nil {
     766           1 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
     767           1 :                         }
     768             :                 }
     769           2 :                 if m.MarkedForCompaction {
     770           1 :                         b.MarkedForCompactionCountDiff--
     771           1 :                 }
     772           2 :                 if _, ok := b.Added[df.Level][df.FileNum]; !ok {
     773           2 :                         dmap[df.FileNum] = m
     774           2 :                 } else {
     775           2 :                         // Present in b.Added for the same level.
     776           2 :                         delete(b.Added[df.Level], df.FileNum)
     777           2 :                 }
     778             :         }
     779             : 
     780             :         // Generate state for Added backing files. Note that these must be generated
     781             :         // before we loop through the NewFiles, because we need to populate the
     782             :         // FileBackings which might be used by the NewFiles loop.
     783           2 :         if b.AddedFileBacking == nil {
     784           2 :                 b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
     785           2 :         }
     786           2 :         for _, fb := range ve.CreatedBackingTables {
     787           2 :                 if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
     788           0 :                         // There is already a FileBacking associated with fb.DiskFileNum.
     789           0 :                         // This should never happen. There must always be only one FileBacking
     790           0 :                         // associated with a backing sstable.
     791           0 :                         panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
     792             :                 }
     793           2 :                 b.AddedFileBacking[fb.DiskFileNum] = fb
     794             :         }
     795             : 
     796           2 :         for _, nf := range ve.NewFiles {
     797           2 :                 // A new file should not have been deleted in this or a preceding
     798           2 :                 // VersionEdit at the same level (though files can move across levels).
     799           2 :                 if dmap := b.Deleted[nf.Level]; dmap != nil {
     800           2 :                         if _, ok := dmap[nf.Meta.FileNum]; ok {
     801           0 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
     802           0 :                         }
     803             :                 }
     804           2 :                 if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
     805           2 :                         // FileBacking for a virtual sstable must only be nil if we're performing
     806           2 :                         // manifest replay.
     807           2 :                         nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
     808           2 :                         if nf.Meta.FileBacking == nil {
     809           0 :                                 return errors.Errorf("FileBacking for virtual sstable must not be nil")
     810           0 :                         }
     811           2 :                 } else if nf.Meta.FileBacking == nil {
     812           0 :                         return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
     813           0 :                 }
     814             : 
     815           2 :                 if b.Added[nf.Level] == nil {
     816           2 :                         b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
     817           2 :                 }
     818           2 :                 b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
     819           2 :                 if b.AddedByFileNum != nil {
     820           2 :                         b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
     821           2 :                 }
     822           2 :                 if nf.Meta.MarkedForCompaction {
     823           1 :                         b.MarkedForCompactionCountDiff++
     824           1 :                 }
     825             :         }
     826             : 
     827             :         // Since a file can be removed from backing files in exactly one version
     828             :         // edit it is safe to just append without any de-duplication.
     829           2 :         b.RemovedFileBacking = append(b.RemovedFileBacking, ve.RemovedBackingTables...)
     830           2 : 
     831           2 :         return nil
     832             : }
     833             : 
     834             : // AccumulateIncompleteAndApplySingleVE should be called if a single version edit
     835             : // is to be applied to the provided curr Version and if the caller needs to
     836             : // update the versionSet.zombieTables map. This function exists separately from
     837             : // BulkVersionEdit.Apply because it is easier to reason about properties
     838             : // regarding BulkVersionedit.Accumulate/Apply and zombie table generation, if we
     839             : // know that exactly one version edit is being accumulated.
     840             : //
     841             : // Note that the version edit passed into this function may be incomplete
     842             : // because compactions don't have the ref counting information necessary to
     843             : // populate VersionEdit.RemovedBackingTables. This function will complete such a
     844             : // version edit by populating RemovedBackingTables.
     845             : //
     846             : // Invariant: Any file being deleted through ve must belong to the curr Version.
     847             : // We can't have a delete for some arbitrary file which does not exist in curr.
     848             : func AccumulateIncompleteAndApplySingleVE(
     849             :         ve *VersionEdit,
     850             :         curr *Version,
     851             :         cmp Compare,
     852             :         formatKey base.FormatKey,
     853             :         flushSplitBytes int64,
     854             :         readCompactionRate int64,
     855             :         backingStateMap map[base.DiskFileNum]*FileBacking,
     856             :         addBackingFunc func(*FileBacking),
     857             :         removeBackingFunc func(base.DiskFileNum),
     858           2 : ) (_ *Version, zombies map[base.DiskFileNum]uint64, _ error) {
     859           2 :         if len(ve.RemovedBackingTables) != 0 {
     860           0 :                 panic("pebble: invalid incomplete version edit")
     861             :         }
     862           2 :         var b BulkVersionEdit
     863           2 :         err := b.Accumulate(ve)
     864           2 :         if err != nil {
     865           0 :                 return nil, nil, err
     866           0 :         }
     867           2 :         zombies = make(map[base.DiskFileNum]uint64)
     868           2 :         v, err := b.Apply(
     869           2 :                 curr, cmp, formatKey, flushSplitBytes, readCompactionRate, zombies,
     870           2 :         )
     871           2 :         if err != nil {
     872           0 :                 return nil, nil, err
     873           0 :         }
     874             : 
     875           2 :         for _, s := range b.AddedFileBacking {
     876           2 :                 addBackingFunc(s)
     877           2 :         }
     878             : 
     879           2 :         for fileNum := range zombies {
     880           2 :                 if _, ok := backingStateMap[fileNum]; ok {
     881           2 :                         // This table was backing some virtual sstable in the latest version,
     882           2 :                         // but is now a zombie. We add RemovedBackingTables entries for
     883           2 :                         // these, before the version edit is written to disk.
     884           2 :                         ve.RemovedBackingTables = append(
     885           2 :                                 ve.RemovedBackingTables, fileNum,
     886           2 :                         )
     887           2 :                         removeBackingFunc(fileNum)
     888           2 :                 }
     889             :         }
     890           2 :         return v, zombies, nil
     891             : }
     892             : 
     893             : // Apply applies the delta b to the current version to produce a new
     894             : // version. The new version is consistent with respect to the comparer cmp.
     895             : //
     896             : // curr may be nil, which is equivalent to a pointer to a zero version.
     897             : //
     898             : // On success, if a non-nil zombies map is provided to Apply, the map is updated
     899             : // with file numbers and files sizes of deleted files. These files are
     900             : // considered zombies because they are no longer referenced by the returned
     901             : // Version, but cannot be deleted from disk as they are still in use by the
     902             : // incoming Version.
     903             : func (b *BulkVersionEdit) Apply(
     904             :         curr *Version,
     905             :         cmp Compare,
     906             :         formatKey base.FormatKey,
     907             :         flushSplitBytes int64,
     908             :         readCompactionRate int64,
     909             :         zombies map[base.DiskFileNum]uint64,
     910           2 : ) (*Version, error) {
     911           2 :         addZombie := func(state *FileBacking) {
     912           2 :                 if zombies != nil {
     913           2 :                         zombies[state.DiskFileNum] = state.Size
     914           2 :                 }
     915             :         }
     916           2 :         removeZombie := func(state *FileBacking) {
     917           2 :                 if zombies != nil {
     918           2 :                         delete(zombies, state.DiskFileNum)
     919           2 :                 }
     920             :         }
     921             : 
     922           2 :         v := new(Version)
     923           2 : 
     924           2 :         // Adjust the count of files marked for compaction.
     925           2 :         if curr != nil {
     926           2 :                 v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
     927           2 :         }
     928           2 :         v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
     929           2 :         if v.Stats.MarkedForCompaction < 0 {
     930           0 :                 return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
     931           0 :         }
     932             : 
     933           2 :         for level := range v.Levels {
     934           2 :                 if curr == nil || curr.Levels[level].tree.root == nil {
     935           2 :                         v.Levels[level] = makeLevelMetadata(cmp, level, nil /* files */)
     936           2 :                 } else {
     937           2 :                         v.Levels[level] = curr.Levels[level].clone()
     938           2 :                 }
     939           2 :                 if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
     940           2 :                         v.RangeKeyLevels[level] = makeLevelMetadata(cmp, level, nil /* files */)
     941           2 :                 } else {
     942           2 :                         v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
     943           2 :                 }
     944             : 
     945           2 :                 if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
     946           2 :                         // There are no edits on this level.
     947           2 :                         if level == 0 {
     948           2 :                                 // Initialize L0Sublevels.
     949           2 :                                 if curr == nil || curr.L0Sublevels == nil {
     950           2 :                                         if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
     951           0 :                                                 return nil, errors.Wrap(err, "pebble: internal error")
     952           0 :                                         }
     953           2 :                                 } else {
     954           2 :                                         v.L0Sublevels = curr.L0Sublevels
     955           2 :                                         v.L0SublevelFiles = v.L0Sublevels.Levels
     956           2 :                                 }
     957             :                         }
     958           2 :                         continue
     959             :                 }
     960             : 
     961             :                 // Some edits on this level.
     962           2 :                 lm := &v.Levels[level]
     963           2 :                 lmRange := &v.RangeKeyLevels[level]
     964           2 : 
     965           2 :                 addedFilesMap := b.Added[level]
     966           2 :                 deletedFilesMap := b.Deleted[level]
     967           2 :                 if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
     968           1 :                         return nil, base.CorruptionErrorf(
     969           1 :                                 "pebble: internal error: No current or added files but have deleted files: %d",
     970           1 :                                 errors.Safe(len(deletedFilesMap)))
     971           1 :                 }
     972             : 
     973             :                 // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
     974             :                 // for a level, it won't be present in deletedFilesMap for the same
     975             :                 // level.
     976             : 
     977           2 :                 for _, f := range deletedFilesMap {
     978           2 :                         if obsolete := v.Levels[level].remove(f); obsolete {
     979           0 :                                 // Deleting a file from the B-Tree may decrement its
     980           0 :                                 // reference count. However, because we cloned the
     981           0 :                                 // previous level's B-Tree, this should never result in a
     982           0 :                                 // file's reference count dropping to zero.
     983           0 :                                 err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
     984           0 :                                 return nil, err
     985           0 :                         }
     986           2 :                         if f.HasRangeKeys {
     987           2 :                                 if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
     988           0 :                                         // Deleting a file from the B-Tree may decrement its
     989           0 :                                         // reference count. However, because we cloned the
     990           0 :                                         // previous level's B-Tree, this should never result in a
     991           0 :                                         // file's reference count dropping to zero.
     992           0 :                                         err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
     993           0 :                                         return nil, err
     994           0 :                                 }
     995             :                         }
     996             : 
     997             :                         // Note that a backing sst will only become a zombie if the
     998             :                         // references to it in the latest version is 0. We will remove the
     999             :                         // backing sst from the zombie list in the next loop if one of the
    1000             :                         // addedFiles in any of the levels is referencing the backing sst.
    1001             :                         // This is possible if a physical sstable is virtualized, or if it
    1002             :                         // is moved.
    1003           2 :                         latestRefCount := f.LatestRefs()
    1004           2 :                         if latestRefCount <= 0 {
    1005           0 :                                 // If a file is present in deletedFilesMap for a level, then it
    1006           0 :                                 // must have already been added to the level previously, which
    1007           0 :                                 // means that its latest ref count cannot be 0.
    1008           0 :                                 err := errors.Errorf("pebble: internal error: incorrect latestRefs reference counting for file", f.FileNum)
    1009           0 :                                 return nil, err
    1010           2 :                         } else if f.LatestUnref() == 0 {
    1011           2 :                                 addZombie(f.FileBacking)
    1012           2 :                         }
    1013             :                 }
    1014             : 
    1015           2 :                 addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
    1016           2 :                 for _, f := range addedFilesMap {
    1017           2 :                         addedFiles = append(addedFiles, f)
    1018           2 :                 }
    1019             :                 // Sort addedFiles by file number. This isn't necessary, but tests which
    1020             :                 // replay invalid manifests check the error output, and the error output
    1021             :                 // depends on the order in which files are added to the btree.
    1022           2 :                 slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
    1023           2 :                         return stdcmp.Compare(a.FileNum, b.FileNum)
    1024           2 :                 })
    1025             : 
    1026           2 :                 var sm, la *FileMetadata
    1027           2 :                 for _, f := range addedFiles {
    1028           2 :                         // NB: allowedSeeks is used for read triggered compactions. It is set using
    1029           2 :                         // Options.Experimental.ReadCompactionRate which defaults to 32KB.
    1030           2 :                         var allowedSeeks int64
    1031           2 :                         if readCompactionRate != 0 {
    1032           2 :                                 allowedSeeks = int64(f.Size) / readCompactionRate
    1033           2 :                         }
    1034           2 :                         if allowedSeeks < 100 {
    1035           2 :                                 allowedSeeks = 100
    1036           2 :                         }
    1037           2 :                         f.AllowedSeeks.Store(allowedSeeks)
    1038           2 :                         f.InitAllowedSeeks = allowedSeeks
    1039           2 : 
    1040           2 :                         err := lm.insert(f)
    1041           2 :                         // We're adding this file to the new version, so increment the
    1042           2 :                         // latest refs count.
    1043           2 :                         f.LatestRef()
    1044           2 :                         if err != nil {
    1045           1 :                                 return nil, errors.Wrap(err, "pebble")
    1046           1 :                         }
    1047           2 :                         if f.HasRangeKeys {
    1048           2 :                                 err = lmRange.insert(f)
    1049           2 :                                 if err != nil {
    1050           0 :                                         return nil, errors.Wrap(err, "pebble")
    1051           0 :                                 }
    1052             :                         }
    1053           2 :                         removeZombie(f.FileBacking)
    1054           2 :                         // Track the keys with the smallest and largest keys, so that we can
    1055           2 :                         // check consistency of the modified span.
    1056           2 :                         if sm == nil || base.InternalCompare(cmp, sm.Smallest, f.Smallest) > 0 {
    1057           2 :                                 sm = f
    1058           2 :                         }
    1059           2 :                         if la == nil || base.InternalCompare(cmp, la.Largest, f.Largest) < 0 {
    1060           2 :                                 la = f
    1061           2 :                         }
    1062             :                 }
    1063             : 
    1064           2 :                 if level == 0 {
    1065           2 :                         if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
    1066           2 :                                 // Flushes and ingestions that do not delete any L0 files do not require
    1067           2 :                                 // a regeneration of L0Sublevels from scratch. We can instead generate
    1068           2 :                                 // it incrementally.
    1069           2 :                                 var err error
    1070           2 :                                 // AddL0Files requires addedFiles to be sorted in seqnum order.
    1071           2 :                                 SortBySeqNum(addedFiles)
    1072           2 :                                 v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
    1073           2 :                                 if errors.Is(err, errInvalidL0SublevelsOpt) {
    1074           2 :                                         err = v.InitL0Sublevels(cmp, formatKey, flushSplitBytes)
    1075           2 :                                 } else if invariants.Enabled && err == nil {
    1076           2 :                                         copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], cmp, formatKey, flushSplitBytes)
    1077           2 :                                         if err != nil {
    1078           0 :                                                 panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
    1079             :                                         }
    1080           2 :                                         s1 := describeSublevels(base.DefaultFormatter, false /* verbose */, copyOfSublevels.Levels)
    1081           2 :                                         s2 := describeSublevels(base.DefaultFormatter, false /* verbose */, v.L0Sublevels.Levels)
    1082           2 :                                         if s1 != s2 {
    1083           0 :                                                 panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
    1084             :                                         }
    1085             :                                 }
    1086           2 :                                 if err != nil {
    1087           0 :                                         return nil, errors.Wrap(err, "pebble: internal error")
    1088           0 :                                 }
    1089           2 :                                 v.L0SublevelFiles = v.L0Sublevels.Levels
    1090           2 :                         } else if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
    1091           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1092           0 :                         }
    1093           2 :                         if err := CheckOrdering(cmp, formatKey, Level(0), v.Levels[level].Iter()); err != nil {
    1094           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1095           0 :                         }
    1096           2 :                         continue
    1097             :                 }
    1098             : 
    1099             :                 // Check consistency of the level in the vicinity of our edits.
    1100           2 :                 if sm != nil && la != nil {
    1101           2 :                         overlap := overlaps(v.Levels[level].Iter(), cmp, sm.Smallest.UserKey,
    1102           2 :                                 la.Largest.UserKey, la.Largest.IsExclusiveSentinel())
    1103           2 :                         // overlap contains all of the added files. We want to ensure that
    1104           2 :                         // the added files are consistent with neighboring existing files
    1105           2 :                         // too, so reslice the overlap to pull in a neighbor on each side.
    1106           2 :                         check := overlap.Reslice(func(start, end *LevelIterator) {
    1107           2 :                                 if m := start.Prev(); m == nil {
    1108           2 :                                         start.Next()
    1109           2 :                                 }
    1110           2 :                                 if m := end.Next(); m == nil {
    1111           2 :                                         end.Prev()
    1112           2 :                                 }
    1113             :                         })
    1114           2 :                         if err := CheckOrdering(cmp, formatKey, Level(level), check.Iter()); err != nil {
    1115           1 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1116           1 :                         }
    1117             :                 }
    1118             :         }
    1119           2 :         return v, nil
    1120             : }

Generated by: LCOV version 1.14