LCOV - code coverage report
Current view: top level - pebble/internal/manifest - version_edit.go (source / functions) Hit Total Coverage
Test: 2023-10-19 08:16Z f6cde3fc - meta test only.lcov Lines: 494 724 68.2 %
Date: 2023-10-19 08:17:42 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package manifest
       6             : 
       7             : import (
       8             :         "bufio"
       9             :         "bytes"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "sort"
      14             :         "time"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             : )
      20             : 
      21             : // TODO(peter): describe the MANIFEST file format, independently of the C++
      22             : // project.
      23             : 
      24             : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
      25             : 
      26             : type byteReader interface {
      27             :         io.ByteReader
      28             :         io.Reader
      29             : }
      30             : 
      31             : // Tags for the versionEdit disk format.
      32             : // Tag 8 is no longer used.
      33             : const (
      34             :         // LevelDB tags.
      35             :         tagComparator     = 1
      36             :         tagLogNumber      = 2
      37             :         tagNextFileNumber = 3
      38             :         tagLastSequence   = 4
      39             :         tagCompactPointer = 5
      40             :         tagDeletedFile    = 6
      41             :         tagNewFile        = 7
      42             :         tagPrevLogNumber  = 9
      43             : 
      44             :         // RocksDB tags.
      45             :         tagNewFile2         = 100
      46             :         tagNewFile3         = 102
      47             :         tagNewFile4         = 103
      48             :         tagColumnFamily     = 200
      49             :         tagColumnFamilyAdd  = 201
      50             :         tagColumnFamilyDrop = 202
      51             :         tagMaxColumnFamily  = 203
      52             : 
      53             :         // Pebble tags.
      54             :         tagNewFile5            = 104 // Range keys.
      55             :         tagCreatedBackingTable = 105
      56             :         tagRemovedBackingTable = 106
      57             : 
      58             :         // The custom tags sub-format used by tagNewFile4 and above.
      59             :         customTagTerminate         = 1
      60             :         customTagNeedsCompaction   = 2
      61             :         customTagCreationTime      = 6
      62             :         customTagPathID            = 65
      63             :         customTagNonSafeIgnoreMask = 1 << 6
      64             :         customTagVirtual           = 66
      65             : )
      66             : 
      67             : // DeletedFileEntry holds the state for a file deletion from a level. The file
      68             : // itself might still be referenced by another level.
      69             : type DeletedFileEntry struct {
      70             :         Level   int
      71             :         FileNum base.FileNum
      72             : }
      73             : 
      74             : // NewFileEntry holds the state for a new file or one moved from a different
      75             : // level.
      76             : type NewFileEntry struct {
      77             :         Level int
      78             :         Meta  *FileMetadata
      79             :         // BackingFileNum is only set during manifest replay, and only for virtual
      80             :         // sstables.
      81             :         BackingFileNum base.DiskFileNum
      82             : }
      83             : 
      84             : // VersionEdit holds the state for an edit to a Version along with other
      85             : // on-disk state (log numbers, next file number, and the last sequence number).
      86             : type VersionEdit struct {
      87             :         // ComparerName is the value of Options.Comparer.Name. This is only set in
      88             :         // the first VersionEdit in a manifest (either when the DB is created, or
      89             :         // when a new manifest is created) and is used to verify that the comparer
      90             :         // specified at Open matches the comparer that was previously used.
      91             :         ComparerName string
      92             : 
      93             :         // MinUnflushedLogNum is the smallest WAL log file number corresponding to
      94             :         // mutations that have not been flushed to an sstable.
      95             :         //
      96             :         // This is an optional field, and 0 represents it is not set.
      97             :         MinUnflushedLogNum base.DiskFileNum
      98             : 
      99             :         // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
     100             :         // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
     101             :         // 6/2011. We keep it around purely for informational purposes when
     102             :         // displaying MANIFEST contents.
     103             :         ObsoletePrevLogNum uint64
     104             : 
     105             :         // The next file number. A single counter is used to assign file numbers
     106             :         // for the WAL, MANIFEST, sstable, and OPTIONS files.
     107             :         NextFileNum uint64
     108             : 
     109             :         // LastSeqNum is an upper bound on the sequence numbers that have been
     110             :         // assigned in flushed WALs. Unflushed WALs (that will be replayed during
     111             :         // recovery) may contain sequence numbers greater than this value.
     112             :         LastSeqNum uint64
     113             : 
     114             :         // A file num may be present in both deleted files and new files when it
     115             :         // is moved from a lower level to a higher level (when the compaction
     116             :         // found that there was no overlapping file at the higher level).
     117             :         DeletedFiles map[DeletedFileEntry]*FileMetadata
     118             :         NewFiles     []NewFileEntry
     119             :         // CreatedBackingTables can be used to preserve the FileBacking associated
     120             :         // with a physical sstable. This is useful when virtual sstables in the
     121             :         // latest version are reconstructed during manifest replay, and we also need
     122             :         // to reconstruct the FileBacking which is required by these virtual
     123             :         // sstables.
     124             :         //
     125             :         // INVARIANT: The FileBacking associated with a physical sstable must only
     126             :         // be added as a backing file in the same version edit where the physical
     127             :         // sstable is first virtualized. This means that the physical sstable must
     128             :         // be present in DeletedFiles and that there must be at least one virtual
     129             :         // sstable with the same FileBacking as the physical sstable in NewFiles. A
     130             :         // file must be present in CreatedBackingTables in exactly one version edit.
     131             :         // The physical sstable associated with the FileBacking must also not be
     132             :         // present in NewFiles.
     133             :         CreatedBackingTables []*FileBacking
     134             :         // RemovedBackingTables is used to remove the FileBacking associated with a
     135             :         // virtual sstable. Note that a backing sstable can be removed as soon as
     136             :         // there are no virtual sstables in the latest version which are using the
     137             :         // backing sstable, but the backing sstable doesn't necessarily have to be
     138             :         // removed atomically with the version edit which removes the last virtual
     139             :         // sstable associated with the backing sstable. The removal can happen in a
     140             :         // future version edit.
     141             :         //
     142             :         // INVARIANT: A file must only be added to RemovedBackingTables if it was
     143             :         // added to CreateBackingTables in a prior version edit. The same version
     144             :         // edit also cannot have the same file present in both CreateBackingTables
     145             :         // and RemovedBackingTables. A file must be present in RemovedBackingTables
     146             :         // in exactly one version edit.
     147             :         RemovedBackingTables []base.DiskFileNum
     148             : }
     149             : 
     150             : // Decode decodes an edit from the specified reader.
     151             : //
     152             : // Note that the Decode step will not set the FileBacking for virtual sstables
     153             : // and the responsibility is left to the caller. However, the Decode step will
     154             : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
     155           1 : func (v *VersionEdit) Decode(r io.Reader) error {
     156           1 :         br, ok := r.(byteReader)
     157           1 :         if !ok {
     158           1 :                 br = bufio.NewReader(r)
     159           1 :         }
     160           1 :         d := versionEditDecoder{br}
     161           1 :         for {
     162           1 :                 tag, err := binary.ReadUvarint(br)
     163           1 :                 if err == io.EOF {
     164           1 :                         break
     165             :                 }
     166           1 :                 if err != nil {
     167           0 :                         return err
     168           0 :                 }
     169           1 :                 switch tag {
     170           1 :                 case tagComparator:
     171           1 :                         s, err := d.readBytes()
     172           1 :                         if err != nil {
     173           0 :                                 return err
     174           0 :                         }
     175           1 :                         v.ComparerName = string(s)
     176             : 
     177           1 :                 case tagLogNumber:
     178           1 :                         n, err := d.readUvarint()
     179           1 :                         if err != nil {
     180           0 :                                 return err
     181           0 :                         }
     182           1 :                         v.MinUnflushedLogNum = base.DiskFileNum(n)
     183             : 
     184           1 :                 case tagNextFileNumber:
     185           1 :                         n, err := d.readUvarint()
     186           1 :                         if err != nil {
     187           0 :                                 return err
     188           0 :                         }
     189           1 :                         v.NextFileNum = n
     190             : 
     191           1 :                 case tagLastSequence:
     192           1 :                         n, err := d.readUvarint()
     193           1 :                         if err != nil {
     194           0 :                                 return err
     195           0 :                         }
     196           1 :                         v.LastSeqNum = n
     197             : 
     198           0 :                 case tagCompactPointer:
     199           0 :                         if _, err := d.readLevel(); err != nil {
     200           0 :                                 return err
     201           0 :                         }
     202           0 :                         if _, err := d.readBytes(); err != nil {
     203           0 :                                 return err
     204           0 :                         }
     205             :                         // NB: RocksDB does not use compaction pointers anymore.
     206             : 
     207           1 :                 case tagRemovedBackingTable:
     208           1 :                         n, err := d.readUvarint()
     209           1 :                         if err != nil {
     210           0 :                                 return err
     211           0 :                         }
     212           1 :                         v.RemovedBackingTables = append(
     213           1 :                                 v.RemovedBackingTables, base.FileNum(n).DiskFileNum(),
     214           1 :                         )
     215           1 :                 case tagCreatedBackingTable:
     216           1 :                         dfn, err := d.readUvarint()
     217           1 :                         if err != nil {
     218           0 :                                 return err
     219           0 :                         }
     220           1 :                         size, err := d.readUvarint()
     221           1 :                         if err != nil {
     222           0 :                                 return err
     223           0 :                         }
     224           1 :                         fileBacking := &FileBacking{
     225           1 :                                 DiskFileNum: base.FileNum(dfn).DiskFileNum(),
     226           1 :                                 Size:        size,
     227           1 :                         }
     228           1 :                         v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
     229           1 :                 case tagDeletedFile:
     230           1 :                         level, err := d.readLevel()
     231           1 :                         if err != nil {
     232           0 :                                 return err
     233           0 :                         }
     234           1 :                         fileNum, err := d.readFileNum()
     235           1 :                         if err != nil {
     236           0 :                                 return err
     237           0 :                         }
     238           1 :                         if v.DeletedFiles == nil {
     239           1 :                                 v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
     240           1 :                         }
     241           1 :                         v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
     242             : 
     243           1 :                 case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
     244           1 :                         level, err := d.readLevel()
     245           1 :                         if err != nil {
     246           0 :                                 return err
     247           0 :                         }
     248           1 :                         fileNum, err := d.readFileNum()
     249           1 :                         if err != nil {
     250           0 :                                 return err
     251           0 :                         }
     252           1 :                         if tag == tagNewFile3 {
     253           0 :                                 // The pathID field appears unused in RocksDB.
     254           0 :                                 _ /* pathID */, err := d.readUvarint()
     255           0 :                                 if err != nil {
     256           0 :                                         return err
     257           0 :                                 }
     258             :                         }
     259           1 :                         size, err := d.readUvarint()
     260           1 :                         if err != nil {
     261           0 :                                 return err
     262           0 :                         }
     263             :                         // We read the smallest / largest key bounds differently depending on
     264             :                         // whether we have point, range or both types of keys present in the
     265             :                         // table.
     266           1 :                         var (
     267           1 :                                 smallestPointKey, largestPointKey []byte
     268           1 :                                 smallestRangeKey, largestRangeKey []byte
     269           1 :                                 parsedPointBounds                 bool
     270           1 :                                 boundsMarker                      byte
     271           1 :                         )
     272           1 :                         if tag != tagNewFile5 {
     273           1 :                                 // Range keys not present in the table. Parse the point key bounds.
     274           1 :                                 smallestPointKey, err = d.readBytes()
     275           1 :                                 if err != nil {
     276           0 :                                         return err
     277           0 :                                 }
     278           1 :                                 largestPointKey, err = d.readBytes()
     279           1 :                                 if err != nil {
     280           0 :                                         return err
     281           0 :                                 }
     282           1 :                         } else {
     283           1 :                                 // Range keys are present in the table. Determine whether we have point
     284           1 :                                 // keys to parse, in addition to the bounds.
     285           1 :                                 boundsMarker, err = d.ReadByte()
     286           1 :                                 if err != nil {
     287           0 :                                         return err
     288           0 :                                 }
     289             :                                 // Parse point key bounds, if present.
     290           1 :                                 if boundsMarker&maskContainsPointKeys > 0 {
     291           1 :                                         smallestPointKey, err = d.readBytes()
     292           1 :                                         if err != nil {
     293           0 :                                                 return err
     294           0 :                                         }
     295           1 :                                         largestPointKey, err = d.readBytes()
     296           1 :                                         if err != nil {
     297           0 :                                                 return err
     298           0 :                                         }
     299           1 :                                         parsedPointBounds = true
     300           1 :                                 } else {
     301           1 :                                         // The table does not have point keys.
     302           1 :                                         // Sanity check: the bounds must be range keys.
     303           1 :                                         if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
     304           0 :                                                 return base.CorruptionErrorf(
     305           0 :                                                         "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
     306           0 :                                                         boundsMarker,
     307           0 :                                                 )
     308           0 :                                         }
     309             :                                 }
     310             :                                 // Parse range key bounds.
     311           1 :                                 smallestRangeKey, err = d.readBytes()
     312           1 :                                 if err != nil {
     313           0 :                                         return err
     314           0 :                                 }
     315           1 :                                 largestRangeKey, err = d.readBytes()
     316           1 :                                 if err != nil {
     317           0 :                                         return err
     318           0 :                                 }
     319             :                         }
     320           1 :                         var smallestSeqNum uint64
     321           1 :                         var largestSeqNum uint64
     322           1 :                         if tag != tagNewFile {
     323           1 :                                 smallestSeqNum, err = d.readUvarint()
     324           1 :                                 if err != nil {
     325           0 :                                         return err
     326           0 :                                 }
     327           1 :                                 largestSeqNum, err = d.readUvarint()
     328           1 :                                 if err != nil {
     329           0 :                                         return err
     330           0 :                                 }
     331             :                         }
     332           1 :                         var markedForCompaction bool
     333           1 :                         var creationTime uint64
     334           1 :                         virtualState := struct {
     335           1 :                                 virtual        bool
     336           1 :                                 backingFileNum uint64
     337           1 :                         }{}
     338           1 :                         if tag == tagNewFile4 || tag == tagNewFile5 {
     339           1 :                                 for {
     340           1 :                                         customTag, err := d.readUvarint()
     341           1 :                                         if err != nil {
     342           0 :                                                 return err
     343           0 :                                         }
     344           1 :                                         if customTag == customTagTerminate {
     345           1 :                                                 break
     346           1 :                                         } else if customTag == customTagVirtual {
     347           1 :                                                 virtualState.virtual = true
     348           1 :                                                 n, err := d.readUvarint()
     349           1 :                                                 if err != nil {
     350           0 :                                                         return err
     351           0 :                                                 }
     352           1 :                                                 virtualState.backingFileNum = n
     353           1 :                                                 continue
     354             :                                         }
     355             : 
     356           1 :                                         field, err := d.readBytes()
     357           1 :                                         if err != nil {
     358           0 :                                                 return err
     359           0 :                                         }
     360           1 :                                         switch customTag {
     361           0 :                                         case customTagNeedsCompaction:
     362           0 :                                                 if len(field) != 1 {
     363           0 :                                                         return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
     364           0 :                                                 }
     365           0 :                                                 markedForCompaction = (field[0] == 1)
     366             : 
     367           1 :                                         case customTagCreationTime:
     368           1 :                                                 var n int
     369           1 :                                                 creationTime, n = binary.Uvarint(field)
     370           1 :                                                 if n != len(field) {
     371           0 :                                                         return base.CorruptionErrorf("new-file4: invalid file creation time")
     372           0 :                                                 }
     373             : 
     374           0 :                                         case customTagPathID:
     375           0 :                                                 return base.CorruptionErrorf("new-file4: path-id field not supported")
     376             : 
     377           0 :                                         default:
     378           0 :                                                 if (customTag & customTagNonSafeIgnoreMask) != 0 {
     379           0 :                                                         return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
     380           0 :                                                 }
     381             :                                         }
     382             :                                 }
     383             :                         }
     384           1 :                         m := &FileMetadata{
     385           1 :                                 FileNum:             fileNum,
     386           1 :                                 Size:                size,
     387           1 :                                 CreationTime:        int64(creationTime),
     388           1 :                                 SmallestSeqNum:      smallestSeqNum,
     389           1 :                                 LargestSeqNum:       largestSeqNum,
     390           1 :                                 MarkedForCompaction: markedForCompaction,
     391           1 :                                 Virtual:             virtualState.virtual,
     392           1 :                         }
     393           1 :                         if tag != tagNewFile5 { // no range keys present
     394           1 :                                 m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     395           1 :                                 m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     396           1 :                                 m.HasPointKeys = true
     397           1 :                                 m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
     398           1 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
     399           1 :                         } else { // range keys present
     400           1 :                                 // Set point key bounds, if parsed.
     401           1 :                                 if parsedPointBounds {
     402           1 :                                         m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     403           1 :                                         m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     404           1 :                                         m.HasPointKeys = true
     405           1 :                                 }
     406             :                                 // Set range key bounds.
     407           1 :                                 m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
     408           1 :                                 m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
     409           1 :                                 m.HasRangeKeys = true
     410           1 :                                 // Set overall bounds (by default assume range keys).
     411           1 :                                 m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
     412           1 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
     413           1 :                                 if boundsMarker&maskSmallest == maskSmallest {
     414           1 :                                         m.Smallest = m.SmallestPointKey
     415           1 :                                         m.boundTypeSmallest = boundTypePointKey
     416           1 :                                 }
     417           1 :                                 if boundsMarker&maskLargest == maskLargest {
     418           1 :                                         m.Largest = m.LargestPointKey
     419           1 :                                         m.boundTypeLargest = boundTypePointKey
     420           1 :                                 }
     421             :                         }
     422           1 :                         m.boundsSet = true
     423           1 :                         if !virtualState.virtual {
     424           1 :                                 m.InitPhysicalBacking()
     425           1 :                         }
     426             : 
     427           1 :                         nfe := NewFileEntry{
     428           1 :                                 Level: level,
     429           1 :                                 Meta:  m,
     430           1 :                         }
     431           1 :                         if virtualState.virtual {
     432           1 :                                 nfe.BackingFileNum = base.FileNum(virtualState.backingFileNum).DiskFileNum()
     433           1 :                         }
     434           1 :                         v.NewFiles = append(v.NewFiles, nfe)
     435             : 
     436           0 :                 case tagPrevLogNumber:
     437           0 :                         n, err := d.readUvarint()
     438           0 :                         if err != nil {
     439           0 :                                 return err
     440           0 :                         }
     441           0 :                         v.ObsoletePrevLogNum = n
     442             : 
     443           0 :                 case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
     444           0 :                         return base.CorruptionErrorf("column families are not supported")
     445             : 
     446           0 :                 default:
     447           0 :                         return errCorruptManifest
     448             :                 }
     449             :         }
     450           1 :         return nil
     451             : }
     452             : 
     453           0 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
     454           0 :         var buf bytes.Buffer
     455           0 :         if v.ComparerName != "" {
     456           0 :                 fmt.Fprintf(&buf, "  comparer:     %s", v.ComparerName)
     457           0 :         }
     458           0 :         if v.MinUnflushedLogNum != 0 {
     459           0 :                 fmt.Fprintf(&buf, "  log-num:       %d\n", v.MinUnflushedLogNum)
     460           0 :         }
     461           0 :         if v.ObsoletePrevLogNum != 0 {
     462           0 :                 fmt.Fprintf(&buf, "  prev-log-num:  %d\n", v.ObsoletePrevLogNum)
     463           0 :         }
     464           0 :         if v.NextFileNum != 0 {
     465           0 :                 fmt.Fprintf(&buf, "  next-file-num: %d\n", v.NextFileNum)
     466           0 :         }
     467           0 :         if v.LastSeqNum != 0 {
     468           0 :                 fmt.Fprintf(&buf, "  last-seq-num:  %d\n", v.LastSeqNum)
     469           0 :         }
     470           0 :         entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
     471           0 :         for df := range v.DeletedFiles {
     472           0 :                 entries = append(entries, df)
     473           0 :         }
     474           0 :         sort.Slice(entries, func(i, j int) bool {
     475           0 :                 if entries[i].Level != entries[j].Level {
     476           0 :                         return entries[i].Level < entries[j].Level
     477           0 :                 }
     478           0 :                 return entries[i].FileNum < entries[j].FileNum
     479             :         })
     480           0 :         for _, df := range entries {
     481           0 :                 fmt.Fprintf(&buf, "  deleted:       L%d %s\n", df.Level, df.FileNum)
     482           0 :         }
     483           0 :         for _, nf := range v.NewFiles {
     484           0 :                 fmt.Fprintf(&buf, "  added:         L%d", nf.Level)
     485           0 :                 if verbose {
     486           0 :                         fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, true /* verbose */))
     487           0 :                 } else {
     488           0 :                         fmt.Fprintf(&buf, " %s", nf.Meta.String())
     489           0 :                 }
     490           0 :                 if nf.Meta.CreationTime != 0 {
     491           0 :                         fmt.Fprintf(&buf, " (%s)",
     492           0 :                                 time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
     493           0 :                 }
     494           0 :                 fmt.Fprintln(&buf)
     495             :         }
     496           0 :         return buf.String()
     497             : }
     498             : 
     499             : // DebugString is a more verbose version of String(). Use this in tests.
     500           0 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
     501           0 :         return v.string(true /* verbose */, fmtKey)
     502           0 : }
     503             : 
     504             : // String implements fmt.Stringer for a VersionEdit.
     505           0 : func (v *VersionEdit) String() string {
     506           0 :         return v.string(false /* verbose */, base.DefaultFormatter)
     507           0 : }
     508             : 
     509             : // Encode encodes an edit to the specified writer.
     510           1 : func (v *VersionEdit) Encode(w io.Writer) error {
     511           1 :         e := versionEditEncoder{new(bytes.Buffer)}
     512           1 : 
     513           1 :         if v.ComparerName != "" {
     514           1 :                 e.writeUvarint(tagComparator)
     515           1 :                 e.writeString(v.ComparerName)
     516           1 :         }
     517           1 :         if v.MinUnflushedLogNum != 0 {
     518           1 :                 e.writeUvarint(tagLogNumber)
     519           1 :                 e.writeUvarint(uint64(v.MinUnflushedLogNum))
     520           1 :         }
     521           1 :         if v.ObsoletePrevLogNum != 0 {
     522           0 :                 e.writeUvarint(tagPrevLogNumber)
     523           0 :                 e.writeUvarint(v.ObsoletePrevLogNum)
     524           0 :         }
     525           1 :         if v.NextFileNum != 0 {
     526           1 :                 e.writeUvarint(tagNextFileNumber)
     527           1 :                 e.writeUvarint(uint64(v.NextFileNum))
     528           1 :         }
     529           1 :         for _, dfn := range v.RemovedBackingTables {
     530           1 :                 e.writeUvarint(tagRemovedBackingTable)
     531           1 :                 e.writeUvarint(uint64(dfn.FileNum()))
     532           1 :         }
     533           1 :         for _, fileBacking := range v.CreatedBackingTables {
     534           1 :                 e.writeUvarint(tagCreatedBackingTable)
     535           1 :                 e.writeUvarint(uint64(fileBacking.DiskFileNum.FileNum()))
     536           1 :                 e.writeUvarint(fileBacking.Size)
     537           1 :         }
     538             :         // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
     539             :         // even though its value is zero. We detect this by encoding LastSeqNum when
     540             :         // ComparerName is set.
     541           1 :         if v.LastSeqNum != 0 || v.ComparerName != "" {
     542           1 :                 e.writeUvarint(tagLastSequence)
     543           1 :                 e.writeUvarint(v.LastSeqNum)
     544           1 :         }
     545           1 :         for x := range v.DeletedFiles {
     546           1 :                 e.writeUvarint(tagDeletedFile)
     547           1 :                 e.writeUvarint(uint64(x.Level))
     548           1 :                 e.writeUvarint(uint64(x.FileNum))
     549           1 :         }
     550           1 :         for _, x := range v.NewFiles {
     551           1 :                 customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
     552           1 :                 var tag uint64
     553           1 :                 switch {
     554           1 :                 case x.Meta.HasRangeKeys:
     555           1 :                         tag = tagNewFile5
     556           1 :                 case customFields:
     557           1 :                         tag = tagNewFile4
     558           0 :                 default:
     559           0 :                         tag = tagNewFile2
     560             :                 }
     561           1 :                 e.writeUvarint(tag)
     562           1 :                 e.writeUvarint(uint64(x.Level))
     563           1 :                 e.writeUvarint(uint64(x.Meta.FileNum))
     564           1 :                 e.writeUvarint(x.Meta.Size)
     565           1 :                 if !x.Meta.HasRangeKeys {
     566           1 :                         // If we have no range keys, preserve the original format and write the
     567           1 :                         // smallest and largest point keys.
     568           1 :                         e.writeKey(x.Meta.SmallestPointKey)
     569           1 :                         e.writeKey(x.Meta.LargestPointKey)
     570           1 :                 } else {
     571           1 :                         // When range keys are present, we first write a marker byte that
     572           1 :                         // indicates if the table also contains point keys, in addition to how the
     573           1 :                         // overall bounds for the table should be reconstructed. This byte is
     574           1 :                         // followed by the keys themselves.
     575           1 :                         b, err := x.Meta.boundsMarker()
     576           1 :                         if err != nil {
     577           0 :                                 return err
     578           0 :                         }
     579           1 :                         if err = e.WriteByte(b); err != nil {
     580           0 :                                 return err
     581           0 :                         }
     582             :                         // Write point key bounds (if present).
     583           1 :                         if x.Meta.HasPointKeys {
     584           1 :                                 e.writeKey(x.Meta.SmallestPointKey)
     585           1 :                                 e.writeKey(x.Meta.LargestPointKey)
     586           1 :                         }
     587             :                         // Write range key bounds.
     588           1 :                         e.writeKey(x.Meta.SmallestRangeKey)
     589           1 :                         e.writeKey(x.Meta.LargestRangeKey)
     590             :                 }
     591           1 :                 e.writeUvarint(x.Meta.SmallestSeqNum)
     592           1 :                 e.writeUvarint(x.Meta.LargestSeqNum)
     593           1 :                 if customFields {
     594           1 :                         if x.Meta.CreationTime != 0 {
     595           1 :                                 e.writeUvarint(customTagCreationTime)
     596           1 :                                 var buf [binary.MaxVarintLen64]byte
     597           1 :                                 n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
     598           1 :                                 e.writeBytes(buf[:n])
     599           1 :                         }
     600           1 :                         if x.Meta.MarkedForCompaction {
     601           0 :                                 e.writeUvarint(customTagNeedsCompaction)
     602           0 :                                 e.writeBytes([]byte{1})
     603           0 :                         }
     604           1 :                         if x.Meta.Virtual {
     605           1 :                                 e.writeUvarint(customTagVirtual)
     606           1 :                                 e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum.FileNum()))
     607           1 :                         }
     608           1 :                         e.writeUvarint(customTagTerminate)
     609             :                 }
     610             :         }
     611           1 :         _, err := w.Write(e.Bytes())
     612           1 :         return err
     613             : }
     614             : 
     615             : // versionEditDecoder should be used to decode version edits.
     616             : type versionEditDecoder struct {
     617             :         byteReader
     618             : }
     619             : 
     620           1 : func (d versionEditDecoder) readBytes() ([]byte, error) {
     621           1 :         n, err := d.readUvarint()
     622           1 :         if err != nil {
     623           0 :                 return nil, err
     624           0 :         }
     625           1 :         s := make([]byte, n)
     626           1 :         _, err = io.ReadFull(d, s)
     627           1 :         if err != nil {
     628           0 :                 if err == io.ErrUnexpectedEOF {
     629           0 :                         return nil, errCorruptManifest
     630           0 :                 }
     631           0 :                 return nil, err
     632             :         }
     633           1 :         return s, nil
     634             : }
     635             : 
     636           1 : func (d versionEditDecoder) readLevel() (int, error) {
     637           1 :         u, err := d.readUvarint()
     638           1 :         if err != nil {
     639           0 :                 return 0, err
     640           0 :         }
     641           1 :         if u >= NumLevels {
     642           0 :                 return 0, errCorruptManifest
     643           0 :         }
     644           1 :         return int(u), nil
     645             : }
     646             : 
     647           1 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
     648           1 :         u, err := d.readUvarint()
     649           1 :         if err != nil {
     650           0 :                 return 0, err
     651           0 :         }
     652           1 :         return base.FileNum(u), nil
     653             : }
     654             : 
     655           1 : func (d versionEditDecoder) readUvarint() (uint64, error) {
     656           1 :         u, err := binary.ReadUvarint(d)
     657           1 :         if err != nil {
     658           0 :                 if err == io.EOF {
     659           0 :                         return 0, errCorruptManifest
     660           0 :                 }
     661           0 :                 return 0, err
     662             :         }
     663           1 :         return u, nil
     664             : }
     665             : 
     666             : type versionEditEncoder struct {
     667             :         *bytes.Buffer
     668             : }
     669             : 
     670           1 : func (e versionEditEncoder) writeBytes(p []byte) {
     671           1 :         e.writeUvarint(uint64(len(p)))
     672           1 :         e.Write(p)
     673           1 : }
     674             : 
     675           1 : func (e versionEditEncoder) writeKey(k InternalKey) {
     676           1 :         e.writeUvarint(uint64(k.Size()))
     677           1 :         e.Write(k.UserKey)
     678           1 :         buf := k.EncodeTrailer()
     679           1 :         e.Write(buf[:])
     680           1 : }
     681             : 
     682           1 : func (e versionEditEncoder) writeString(s string) {
     683           1 :         e.writeUvarint(uint64(len(s)))
     684           1 :         e.WriteString(s)
     685           1 : }
     686             : 
     687           1 : func (e versionEditEncoder) writeUvarint(u uint64) {
     688           1 :         var buf [binary.MaxVarintLen64]byte
     689           1 :         n := binary.PutUvarint(buf[:], u)
     690           1 :         e.Write(buf[:n])
     691           1 : }
     692             : 
     693             : // BulkVersionEdit summarizes the files added and deleted from a set of version
     694             : // edits.
     695             : //
     696             : // INVARIANTS:
     697             : // No file can be added to a level more than once. This is true globally, and
     698             : // also true for all of the calls to Accumulate for a single bulk version edit.
     699             : //
     700             : // No file can be removed from a level more than once. This is true globally,
     701             : // and also true for all of the calls to Accumulate for a single bulk version
     702             : // edit.
     703             : //
     704             : // A file must not be added and removed from a given level in the same version
     705             : // edit.
     706             : //
     707             : // A file that is being removed from a level must have been added to that level
     708             : // before (in a prior version edit). Note that a given file can be deleted from
     709             : // a level and added to another level in a single version edit
     710             : type BulkVersionEdit struct {
     711             :         Added   [NumLevels]map[base.FileNum]*FileMetadata
     712             :         Deleted [NumLevels]map[base.FileNum]*FileMetadata
     713             : 
     714             :         // AddedFileBacking is a map to support lookup so that we can populate the
     715             :         // FileBacking of virtual sstables during manifest replay.
     716             :         AddedFileBacking   map[base.DiskFileNum]*FileBacking
     717             :         RemovedFileBacking []base.DiskFileNum
     718             : 
     719             :         // AddedByFileNum maps file number to file metadata for all added files
     720             :         // from accumulated version edits. AddedByFileNum is only populated if set
     721             :         // to non-nil by a caller. It must be set to non-nil when replaying
     722             :         // version edits read from a MANIFEST (as opposed to VersionEdits
     723             :         // constructed in-memory).  While replaying a MANIFEST file,
     724             :         // VersionEdit.DeletedFiles map entries have nil values, because the
     725             :         // on-disk deletion record encodes only the file number. Accumulate
     726             :         // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
     727             :         // field with non-nil *FileMetadata.
     728             :         AddedByFileNum map[base.FileNum]*FileMetadata
     729             : 
     730             :         // MarkedForCompactionCountDiff holds the aggregated count of files
     731             :         // marked for compaction added or removed.
     732             :         MarkedForCompactionCountDiff int
     733             : }
     734             : 
     735             : // Accumulate adds the file addition and deletions in the specified version
     736             : // edit to the bulk edit's internal state.
     737             : //
     738             : // INVARIANTS:
     739             : // If a file is added to a given level in a call to Accumulate and then removed
     740             : // from that level in a subsequent call, the file will not be present in the
     741             : // resulting BulkVersionEdit.Deleted for that level.
     742             : //
     743             : // After accumulation of version edits, the bulk version edit may have
     744             : // information about a file which has been deleted from a level, but it may
     745             : // not have information about the same file added to the same level. The add
     746             : // could've occurred as part of a previous bulk version edit. In this case,
     747             : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
     748             : // of the accumulation, because we need to decrease the refcount of the
     749             : // deleted file in Apply.
     750           1 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
     751           1 :         for df, m := range ve.DeletedFiles {
     752           1 :                 dmap := b.Deleted[df.Level]
     753           1 :                 if dmap == nil {
     754           1 :                         dmap = make(map[base.FileNum]*FileMetadata)
     755           1 :                         b.Deleted[df.Level] = dmap
     756           1 :                 }
     757             : 
     758           1 :                 if m == nil {
     759           1 :                         // m is nil only when replaying a MANIFEST.
     760           1 :                         if b.AddedByFileNum == nil {
     761           0 :                                 return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
     762           0 :                         }
     763           1 :                         m = b.AddedByFileNum[df.FileNum]
     764           1 :                         if m == nil {
     765           0 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
     766           0 :                         }
     767             :                 }
     768           1 :                 if m.MarkedForCompaction {
     769           0 :                         b.MarkedForCompactionCountDiff--
     770           0 :                 }
     771           1 :                 if _, ok := b.Added[df.Level][df.FileNum]; !ok {
     772           1 :                         dmap[df.FileNum] = m
     773           1 :                 } else {
     774           1 :                         // Present in b.Added for the same level.
     775           1 :                         delete(b.Added[df.Level], df.FileNum)
     776           1 :                 }
     777             :         }
     778             : 
     779             :         // Generate state for Added backing files. Note that these must be generated
     780             :         // before we loop through the NewFiles, because we need to populate the
     781             :         // FileBackings which might be used by the NewFiles loop.
     782           1 :         if b.AddedFileBacking == nil {
     783           1 :                 b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
     784           1 :         }
     785           1 :         for _, fb := range ve.CreatedBackingTables {
     786           1 :                 if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
     787           0 :                         // There is already a FileBacking associated with fb.DiskFileNum.
     788           0 :                         // This should never happen. There must always be only one FileBacking
     789           0 :                         // associated with a backing sstable.
     790           0 :                         panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
     791             :                 }
     792           1 :                 b.AddedFileBacking[fb.DiskFileNum] = fb
     793             :         }
     794             : 
     795           1 :         for _, nf := range ve.NewFiles {
     796           1 :                 // A new file should not have been deleted in this or a preceding
     797           1 :                 // VersionEdit at the same level (though files can move across levels).
     798           1 :                 if dmap := b.Deleted[nf.Level]; dmap != nil {
     799           1 :                         if _, ok := dmap[nf.Meta.FileNum]; ok {
     800           0 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
     801           0 :                         }
     802             :                 }
     803           1 :                 if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
     804           1 :                         // FileBacking for a virtual sstable must only be nil if we're performing
     805           1 :                         // manifest replay.
     806           1 :                         nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
     807           1 :                         if nf.Meta.FileBacking == nil {
     808           0 :                                 return errors.Errorf("FileBacking for virtual sstable must not be nil")
     809           0 :                         }
     810           1 :                 } else if nf.Meta.FileBacking == nil {
     811           0 :                         return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
     812           0 :                 }
     813             : 
     814           1 :                 if b.Added[nf.Level] == nil {
     815           1 :                         b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
     816           1 :                 }
     817           1 :                 b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
     818           1 :                 if b.AddedByFileNum != nil {
     819           1 :                         b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
     820           1 :                 }
     821           1 :                 if nf.Meta.MarkedForCompaction {
     822           0 :                         b.MarkedForCompactionCountDiff++
     823           0 :                 }
     824             :         }
     825             : 
     826             :         // Since a file can be removed from backing files in exactly one version
     827             :         // edit it is safe to just append without any de-duplication.
     828           1 :         b.RemovedFileBacking = append(b.RemovedFileBacking, ve.RemovedBackingTables...)
     829           1 : 
     830           1 :         return nil
     831             : }
     832             : 
     833             : // AccumulateIncompleteAndApplySingleVE should be called if a single version edit
     834             : // is to be applied to the provided curr Version and if the caller needs to
     835             : // update the versionSet.zombieTables map. This function exists separately from
     836             : // BulkVersionEdit.Apply because it is easier to reason about properties
     837             : // regarding BulkVersionedit.Accumulate/Apply and zombie table generation, if we
     838             : // know that exactly one version edit is being accumulated.
     839             : //
     840             : // Note that the version edit passed into this function may be incomplete
     841             : // because compactions don't have the ref counting information necessary to
     842             : // populate VersionEdit.RemovedBackingTables. This function will complete such a
     843             : // version edit by populating RemovedBackingTables.
     844             : //
     845             : // Invariant: Any file being deleted through ve must belong to the curr Version.
     846             : // We can't have a delete for some arbitrary file which does not exist in curr.
     847             : func AccumulateIncompleteAndApplySingleVE(
     848             :         ve *VersionEdit,
     849             :         curr *Version,
     850             :         cmp Compare,
     851             :         formatKey base.FormatKey,
     852             :         flushSplitBytes int64,
     853             :         readCompactionRate int64,
     854             :         backingStateMap map[base.DiskFileNum]*FileBacking,
     855             :         addBackingFunc func(*FileBacking),
     856             :         removeBackingFunc func(base.DiskFileNum),
     857           1 : ) (_ *Version, zombies map[base.DiskFileNum]uint64, _ error) {
     858           1 :         if len(ve.RemovedBackingTables) != 0 {
     859           0 :                 panic("pebble: invalid incomplete version edit")
     860             :         }
     861           1 :         var b BulkVersionEdit
     862           1 :         err := b.Accumulate(ve)
     863           1 :         if err != nil {
     864           0 :                 return nil, nil, err
     865           0 :         }
     866           1 :         zombies = make(map[base.DiskFileNum]uint64)
     867           1 :         v, err := b.Apply(
     868           1 :                 curr, cmp, formatKey, flushSplitBytes, readCompactionRate, zombies,
     869           1 :         )
     870           1 :         if err != nil {
     871           0 :                 return nil, nil, err
     872           0 :         }
     873             : 
     874           1 :         for _, s := range b.AddedFileBacking {
     875           1 :                 addBackingFunc(s)
     876           1 :         }
     877             : 
     878           1 :         for fileNum := range zombies {
     879           1 :                 if _, ok := backingStateMap[fileNum]; ok {
     880           1 :                         // This table was backing some virtual sstable in the latest version,
     881           1 :                         // but is now a zombie. We add RemovedBackingTables entries for
     882           1 :                         // these, before the version edit is written to disk.
     883           1 :                         ve.RemovedBackingTables = append(
     884           1 :                                 ve.RemovedBackingTables, fileNum,
     885           1 :                         )
     886           1 :                         removeBackingFunc(fileNum)
     887           1 :                 }
     888             :         }
     889           1 :         return v, zombies, nil
     890             : }
     891             : 
     892             : // Apply applies the delta b to the current version to produce a new
     893             : // version. The new version is consistent with respect to the comparer cmp.
     894             : //
     895             : // curr may be nil, which is equivalent to a pointer to a zero version.
     896             : //
     897             : // On success, if a non-nil zombies map is provided to Apply, the map is updated
     898             : // with file numbers and files sizes of deleted files. These files are
     899             : // considered zombies because they are no longer referenced by the returned
     900             : // Version, but cannot be deleted from disk as they are still in use by the
     901             : // incoming Version.
     902             : func (b *BulkVersionEdit) Apply(
     903             :         curr *Version,
     904             :         cmp Compare,
     905             :         formatKey base.FormatKey,
     906             :         flushSplitBytes int64,
     907             :         readCompactionRate int64,
     908             :         zombies map[base.DiskFileNum]uint64,
     909           1 : ) (*Version, error) {
     910           1 :         addZombie := func(state *FileBacking) {
     911           1 :                 if zombies != nil {
     912           1 :                         zombies[state.DiskFileNum] = state.Size
     913           1 :                 }
     914             :         }
     915           1 :         removeZombie := func(state *FileBacking) {
     916           1 :                 if zombies != nil {
     917           1 :                         delete(zombies, state.DiskFileNum)
     918           1 :                 }
     919             :         }
     920             : 
     921           1 :         v := new(Version)
     922           1 : 
     923           1 :         // Adjust the count of files marked for compaction.
     924           1 :         if curr != nil {
     925           1 :                 v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
     926           1 :         }
     927           1 :         v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
     928           1 :         if v.Stats.MarkedForCompaction < 0 {
     929           0 :                 return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
     930           0 :         }
     931             : 
     932           1 :         for level := range v.Levels {
     933           1 :                 if curr == nil || curr.Levels[level].tree.root == nil {
     934           1 :                         v.Levels[level] = makeLevelMetadata(cmp, level, nil /* files */)
     935           1 :                 } else {
     936           1 :                         v.Levels[level] = curr.Levels[level].clone()
     937           1 :                 }
     938           1 :                 if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
     939           1 :                         v.RangeKeyLevels[level] = makeLevelMetadata(cmp, level, nil /* files */)
     940           1 :                 } else {
     941           1 :                         v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
     942           1 :                 }
     943             : 
     944           1 :                 if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
     945           1 :                         // There are no edits on this level.
     946           1 :                         if level == 0 {
     947           1 :                                 // Initialize L0Sublevels.
     948           1 :                                 if curr == nil || curr.L0Sublevels == nil {
     949           1 :                                         if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
     950           0 :                                                 return nil, errors.Wrap(err, "pebble: internal error")
     951           0 :                                         }
     952           1 :                                 } else {
     953           1 :                                         v.L0Sublevels = curr.L0Sublevels
     954           1 :                                         v.L0SublevelFiles = v.L0Sublevels.Levels
     955           1 :                                 }
     956             :                         }
     957           1 :                         continue
     958             :                 }
     959             : 
     960             :                 // Some edits on this level.
     961           1 :                 lm := &v.Levels[level]
     962           1 :                 lmRange := &v.RangeKeyLevels[level]
     963           1 : 
     964           1 :                 addedFilesMap := b.Added[level]
     965           1 :                 deletedFilesMap := b.Deleted[level]
     966           1 :                 if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
     967           0 :                         return nil, base.CorruptionErrorf(
     968           0 :                                 "pebble: internal error: No current or added files but have deleted files: %d",
     969           0 :                                 errors.Safe(len(deletedFilesMap)))
     970           0 :                 }
     971             : 
     972             :                 // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
     973             :                 // for a level, it won't be present in deletedFilesMap for the same
     974             :                 // level.
     975             : 
     976           1 :                 for _, f := range deletedFilesMap {
     977           1 :                         if obsolete := v.Levels[level].remove(f); obsolete {
     978           0 :                                 // Deleting a file from the B-Tree may decrement its
     979           0 :                                 // reference count. However, because we cloned the
     980           0 :                                 // previous level's B-Tree, this should never result in a
     981           0 :                                 // file's reference count dropping to zero.
     982           0 :                                 err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
     983           0 :                                 return nil, err
     984           0 :                         }
     985           1 :                         if f.HasRangeKeys {
     986           1 :                                 if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
     987           0 :                                         // Deleting a file from the B-Tree may decrement its
     988           0 :                                         // reference count. However, because we cloned the
     989           0 :                                         // previous level's B-Tree, this should never result in a
     990           0 :                                         // file's reference count dropping to zero.
     991           0 :                                         err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
     992           0 :                                         return nil, err
     993           0 :                                 }
     994             :                         }
     995             : 
     996             :                         // Note that a backing sst will only become a zombie if the
     997             :                         // references to it in the latest version is 0. We will remove the
     998             :                         // backing sst from the zombie list in the next loop if one of the
     999             :                         // addedFiles in any of the levels is referencing the backing sst.
    1000             :                         // This is possible if a physical sstable is virtualized, or if it
    1001             :                         // is moved.
    1002           1 :                         latestRefCount := f.LatestRefs()
    1003           1 :                         if latestRefCount <= 0 {
    1004           0 :                                 // If a file is present in deletedFilesMap for a level, then it
    1005           0 :                                 // must have already been added to the level previously, which
    1006           0 :                                 // means that its latest ref count cannot be 0.
    1007           0 :                                 err := errors.Errorf("pebble: internal error: incorrect latestRefs reference counting for file", f.FileNum)
    1008           0 :                                 return nil, err
    1009           1 :                         } else if f.LatestUnref() == 0 {
    1010           1 :                                 addZombie(f.FileBacking)
    1011           1 :                         }
    1012             :                 }
    1013             : 
    1014           1 :                 addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
    1015           1 :                 for _, f := range addedFilesMap {
    1016           1 :                         addedFiles = append(addedFiles, f)
    1017           1 :                 }
    1018             :                 // Sort addedFiles by file number. This isn't necessary, but tests which
    1019             :                 // replay invalid manifests check the error output, and the error output
    1020             :                 // depends on the order in which files are added to the btree.
    1021           1 :                 sort.Slice(addedFiles, func(i, j int) bool {
    1022           1 :                         return addedFiles[i].FileNum < addedFiles[j].FileNum
    1023           1 :                 })
    1024             : 
    1025           1 :                 var sm, la *FileMetadata
    1026           1 :                 for _, f := range addedFiles {
    1027           1 :                         // NB: allowedSeeks is used for read triggered compactions. It is set using
    1028           1 :                         // Options.Experimental.ReadCompactionRate which defaults to 32KB.
    1029           1 :                         var allowedSeeks int64
    1030           1 :                         if readCompactionRate != 0 {
    1031           1 :                                 allowedSeeks = int64(f.Size) / readCompactionRate
    1032           1 :                         }
    1033           1 :                         if allowedSeeks < 100 {
    1034           1 :                                 allowedSeeks = 100
    1035           1 :                         }
    1036           1 :                         f.AllowedSeeks.Store(allowedSeeks)
    1037           1 :                         f.InitAllowedSeeks = allowedSeeks
    1038           1 : 
    1039           1 :                         err := lm.insert(f)
    1040           1 :                         // We're adding this file to the new version, so increment the
    1041           1 :                         // latest refs count.
    1042           1 :                         f.LatestRef()
    1043           1 :                         if err != nil {
    1044           0 :                                 return nil, errors.Wrap(err, "pebble")
    1045           0 :                         }
    1046           1 :                         if f.HasRangeKeys {
    1047           1 :                                 err = lmRange.insert(f)
    1048           1 :                                 if err != nil {
    1049           0 :                                         return nil, errors.Wrap(err, "pebble")
    1050           0 :                                 }
    1051             :                         }
    1052           1 :                         removeZombie(f.FileBacking)
    1053           1 :                         // Track the keys with the smallest and largest keys, so that we can
    1054           1 :                         // check consistency of the modified span.
    1055           1 :                         if sm == nil || base.InternalCompare(cmp, sm.Smallest, f.Smallest) > 0 {
    1056           1 :                                 sm = f
    1057           1 :                         }
    1058           1 :                         if la == nil || base.InternalCompare(cmp, la.Largest, f.Largest) < 0 {
    1059           1 :                                 la = f
    1060           1 :                         }
    1061             :                 }
    1062             : 
    1063           1 :                 if level == 0 {
    1064           1 :                         if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
    1065           1 :                                 // Flushes and ingestions that do not delete any L0 files do not require
    1066           1 :                                 // a regeneration of L0Sublevels from scratch. We can instead generate
    1067           1 :                                 // it incrementally.
    1068           1 :                                 var err error
    1069           1 :                                 // AddL0Files requires addedFiles to be sorted in seqnum order.
    1070           1 :                                 SortBySeqNum(addedFiles)
    1071           1 :                                 v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
    1072           1 :                                 if errors.Is(err, errInvalidL0SublevelsOpt) {
    1073           1 :                                         err = v.InitL0Sublevels(cmp, formatKey, flushSplitBytes)
    1074           1 :                                 } else if invariants.Enabled && err == nil {
    1075           1 :                                         copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], cmp, formatKey, flushSplitBytes)
    1076           1 :                                         if err != nil {
    1077           0 :                                                 panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
    1078             :                                         }
    1079           1 :                                         s1 := describeSublevels(base.DefaultFormatter, false /* verbose */, copyOfSublevels.Levels)
    1080           1 :                                         s2 := describeSublevels(base.DefaultFormatter, false /* verbose */, v.L0Sublevels.Levels)
    1081           1 :                                         if s1 != s2 {
    1082           0 :                                                 panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
    1083             :                                         }
    1084             :                                 }
    1085           1 :                                 if err != nil {
    1086           0 :                                         return nil, errors.Wrap(err, "pebble: internal error")
    1087           0 :                                 }
    1088           1 :                                 v.L0SublevelFiles = v.L0Sublevels.Levels
    1089           1 :                         } else if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
    1090           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1091           0 :                         }
    1092           1 :                         if err := CheckOrdering(cmp, formatKey, Level(0), v.Levels[level].Iter()); err != nil {
    1093           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1094           0 :                         }
    1095           1 :                         continue
    1096             :                 }
    1097             : 
    1098             :                 // Check consistency of the level in the vicinity of our edits.
    1099           1 :                 if sm != nil && la != nil {
    1100           1 :                         overlap := overlaps(v.Levels[level].Iter(), cmp, sm.Smallest.UserKey,
    1101           1 :                                 la.Largest.UserKey, la.Largest.IsExclusiveSentinel())
    1102           1 :                         // overlap contains all of the added files. We want to ensure that
    1103           1 :                         // the added files are consistent with neighboring existing files
    1104           1 :                         // too, so reslice the overlap to pull in a neighbor on each side.
    1105           1 :                         check := overlap.Reslice(func(start, end *LevelIterator) {
    1106           1 :                                 if m := start.Prev(); m == nil {
    1107           1 :                                         start.Next()
    1108           1 :                                 }
    1109           1 :                                 if m := end.Next(); m == nil {
    1110           1 :                                         end.Prev()
    1111           1 :                                 }
    1112             :                         })
    1113           1 :                         if err := CheckOrdering(cmp, formatKey, Level(level), check.Iter()); err != nil {
    1114           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1115           0 :                         }
    1116             :                 }
    1117             :         }
    1118           1 :         return v, nil
    1119             : }

Generated by: LCOV version 1.14