LCOV - code coverage report
Current view: top level - pebble/internal/manifest - version_edit.go (source / functions) Hit Total Coverage
Test: 2024-03-19 08:15Z 13bbeea1 - tests + meta.lcov Lines: 593 758 78.2 %
Date: 2024-03-19 08:17:12 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package manifest
       6             : 
       7             : import (
       8             :         "bufio"
       9             :         "bytes"
      10             :         stdcmp "cmp"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "slices"
      15             :         "strings"
      16             :         "time"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble/internal/base"
      20             :         "github.com/cockroachdb/pebble/internal/invariants"
      21             :         "github.com/cockroachdb/pebble/sstable"
      22             : )
      23             : 
      24             : // TODO(peter): describe the MANIFEST file format, independently of the C++
      25             : // project.
      26             : 
      27             : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
      28             : 
      29             : type byteReader interface {
      30             :         io.ByteReader
      31             :         io.Reader
      32             : }
      33             : 
      34             : // Tags for the versionEdit disk format.
      35             : // Tag 8 is no longer used.
      36             : const (
      37             :         // LevelDB tags.
      38             :         tagComparator     = 1
      39             :         tagLogNumber      = 2
      40             :         tagNextFileNumber = 3
      41             :         tagLastSequence   = 4
      42             :         tagCompactPointer = 5
      43             :         tagDeletedFile    = 6
      44             :         tagNewFile        = 7
      45             :         tagPrevLogNumber  = 9
      46             : 
      47             :         // RocksDB tags.
      48             :         tagNewFile2         = 100
      49             :         tagNewFile3         = 102
      50             :         tagNewFile4         = 103
      51             :         tagColumnFamily     = 200
      52             :         tagColumnFamilyAdd  = 201
      53             :         tagColumnFamilyDrop = 202
      54             :         tagMaxColumnFamily  = 203
      55             : 
      56             :         // Pebble tags.
      57             :         tagNewFile5            = 104 // Range keys.
      58             :         tagCreatedBackingTable = 105
      59             :         tagRemovedBackingTable = 106
      60             : 
      61             :         // The custom tags sub-format used by tagNewFile4 and above. All tags less
      62             :         // than customTagNonSafeIgnoreMask are safe to ignore and their format must be
      63             :         // a single bytes field.
      64             :         customTagTerminate         = 1
      65             :         customTagNeedsCompaction   = 2
      66             :         customTagCreationTime      = 6
      67             :         customTagPathID            = 65
      68             :         customTagNonSafeIgnoreMask = 1 << 6
      69             :         customTagVirtual           = 66
      70             :         customTagSyntheticPrefix   = 67
      71             :         customTagSyntheticSuffix   = 68
      72             : )
      73             : 
      74             : // DeletedFileEntry holds the state for a file deletion from a level. The file
      75             : // itself might still be referenced by another level.
      76             : type DeletedFileEntry struct {
      77             :         Level   int
      78             :         FileNum base.FileNum
      79             : }
      80             : 
      81             : // NewFileEntry holds the state for a new file or one moved from a different
      82             : // level.
      83             : type NewFileEntry struct {
      84             :         Level int
      85             :         Meta  *FileMetadata
      86             :         // BackingFileNum is only set during manifest replay, and only for virtual
      87             :         // sstables.
      88             :         BackingFileNum base.DiskFileNum
      89             : }
      90             : 
      91             : // VersionEdit holds the state for an edit to a Version along with other
      92             : // on-disk state (log numbers, next file number, and the last sequence number).
      93             : type VersionEdit struct {
      94             :         // ComparerName is the value of Options.Comparer.Name. This is only set in
      95             :         // the first VersionEdit in a manifest (either when the DB is created, or
      96             :         // when a new manifest is created) and is used to verify that the comparer
      97             :         // specified at Open matches the comparer that was previously used.
      98             :         ComparerName string
      99             : 
     100             :         // MinUnflushedLogNum is the smallest WAL log file number corresponding to
     101             :         // mutations that have not been flushed to an sstable.
     102             :         //
     103             :         // This is an optional field, and 0 represents it is not set.
     104             :         MinUnflushedLogNum base.DiskFileNum
     105             : 
     106             :         // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
     107             :         // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
     108             :         // 6/2011. We keep it around purely for informational purposes when
     109             :         // displaying MANIFEST contents.
     110             :         ObsoletePrevLogNum uint64
     111             : 
     112             :         // The next file number. A single counter is used to assign file numbers
     113             :         // for the WAL, MANIFEST, sstable, and OPTIONS files.
     114             :         NextFileNum uint64
     115             : 
     116             :         // LastSeqNum is an upper bound on the sequence numbers that have been
     117             :         // assigned in flushed WALs. Unflushed WALs (that will be replayed during
     118             :         // recovery) may contain sequence numbers greater than this value.
     119             :         LastSeqNum uint64
     120             : 
     121             :         // A file num may be present in both deleted files and new files when it
     122             :         // is moved from a lower level to a higher level (when the compaction
     123             :         // found that there was no overlapping file at the higher level).
     124             :         DeletedFiles map[DeletedFileEntry]*FileMetadata
     125             :         NewFiles     []NewFileEntry
     126             :         // CreatedBackingTables can be used to preserve the FileBacking associated
     127             :         // with a physical sstable. This is useful when virtual sstables in the
     128             :         // latest version are reconstructed during manifest replay, and we also need
     129             :         // to reconstruct the FileBacking which is required by these virtual
     130             :         // sstables.
     131             :         //
     132             :         // INVARIANT: The FileBacking associated with a physical sstable must only
     133             :         // be added as a backing file in the same version edit where the physical
     134             :         // sstable is first virtualized. This means that the physical sstable must
     135             :         // be present in DeletedFiles and that there must be at least one virtual
     136             :         // sstable with the same FileBacking as the physical sstable in NewFiles. A
     137             :         // file must be present in CreatedBackingTables in exactly one version edit.
     138             :         // The physical sstable associated with the FileBacking must also not be
     139             :         // present in NewFiles.
     140             :         CreatedBackingTables []*FileBacking
     141             :         // RemovedBackingTables is used to remove the FileBacking associated with a
     142             :         // virtual sstable. Note that a backing sstable can be removed as soon as
     143             :         // there are no virtual sstables in the latest version which are using the
     144             :         // backing sstable, but the backing sstable doesn't necessarily have to be
     145             :         // removed atomically with the version edit which removes the last virtual
     146             :         // sstable associated with the backing sstable. The removal can happen in a
     147             :         // future version edit.
     148             :         //
     149             :         // INVARIANT: A file must only be added to RemovedBackingTables if it was
     150             :         // added to CreateBackingTables in a prior version edit. The same version
     151             :         // edit also cannot have the same file present in both CreateBackingTables
     152             :         // and RemovedBackingTables. A file must be present in RemovedBackingTables
     153             :         // in exactly one version edit.
     154             :         RemovedBackingTables []base.DiskFileNum
     155             : }
     156             : 
     157             : // Decode decodes an edit from the specified reader.
     158             : //
     159             : // Note that the Decode step will not set the FileBacking for virtual sstables
     160             : // and the responsibility is left to the caller. However, the Decode step will
     161             : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
     162           2 : func (v *VersionEdit) Decode(r io.Reader) error {
     163           2 :         br, ok := r.(byteReader)
     164           2 :         if !ok {
     165           2 :                 br = bufio.NewReader(r)
     166           2 :         }
     167           2 :         d := versionEditDecoder{br}
     168           2 :         for {
     169           2 :                 tag, err := binary.ReadUvarint(br)
     170           2 :                 if err == io.EOF {
     171           2 :                         break
     172             :                 }
     173           2 :                 if err != nil {
     174           0 :                         return err
     175           0 :                 }
     176           2 :                 switch tag {
     177           2 :                 case tagComparator:
     178           2 :                         s, err := d.readBytes()
     179           2 :                         if err != nil {
     180           0 :                                 return err
     181           0 :                         }
     182           2 :                         v.ComparerName = string(s)
     183             : 
     184           2 :                 case tagLogNumber:
     185           2 :                         n, err := d.readUvarint()
     186           2 :                         if err != nil {
     187           0 :                                 return err
     188           0 :                         }
     189           2 :                         v.MinUnflushedLogNum = base.DiskFileNum(n)
     190             : 
     191           2 :                 case tagNextFileNumber:
     192           2 :                         n, err := d.readUvarint()
     193           2 :                         if err != nil {
     194           0 :                                 return err
     195           0 :                         }
     196           2 :                         v.NextFileNum = n
     197             : 
     198           2 :                 case tagLastSequence:
     199           2 :                         n, err := d.readUvarint()
     200           2 :                         if err != nil {
     201           0 :                                 return err
     202           0 :                         }
     203           2 :                         v.LastSeqNum = n
     204             : 
     205           0 :                 case tagCompactPointer:
     206           0 :                         if _, err := d.readLevel(); err != nil {
     207           0 :                                 return err
     208           0 :                         }
     209           0 :                         if _, err := d.readBytes(); err != nil {
     210           0 :                                 return err
     211           0 :                         }
     212             :                         // NB: RocksDB does not use compaction pointers anymore.
     213             : 
     214           2 :                 case tagRemovedBackingTable:
     215           2 :                         n, err := d.readUvarint()
     216           2 :                         if err != nil {
     217           0 :                                 return err
     218           0 :                         }
     219           2 :                         v.RemovedBackingTables = append(
     220           2 :                                 v.RemovedBackingTables, base.DiskFileNum(n),
     221           2 :                         )
     222           2 :                 case tagCreatedBackingTable:
     223           2 :                         dfn, err := d.readUvarint()
     224           2 :                         if err != nil {
     225           0 :                                 return err
     226           0 :                         }
     227           2 :                         size, err := d.readUvarint()
     228           2 :                         if err != nil {
     229           0 :                                 return err
     230           0 :                         }
     231           2 :                         fileBacking := &FileBacking{
     232           2 :                                 DiskFileNum: base.DiskFileNum(dfn),
     233           2 :                                 Size:        size,
     234           2 :                         }
     235           2 :                         v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
     236           2 :                 case tagDeletedFile:
     237           2 :                         level, err := d.readLevel()
     238           2 :                         if err != nil {
     239           0 :                                 return err
     240           0 :                         }
     241           2 :                         fileNum, err := d.readFileNum()
     242           2 :                         if err != nil {
     243           0 :                                 return err
     244           0 :                         }
     245           2 :                         if v.DeletedFiles == nil {
     246           2 :                                 v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
     247           2 :                         }
     248           2 :                         v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
     249             : 
     250           2 :                 case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
     251           2 :                         level, err := d.readLevel()
     252           2 :                         if err != nil {
     253           0 :                                 return err
     254           0 :                         }
     255           2 :                         fileNum, err := d.readFileNum()
     256           2 :                         if err != nil {
     257           0 :                                 return err
     258           0 :                         }
     259           2 :                         if tag == tagNewFile3 {
     260           0 :                                 // The pathID field appears unused in RocksDB.
     261           0 :                                 _ /* pathID */, err := d.readUvarint()
     262           0 :                                 if err != nil {
     263           0 :                                         return err
     264           0 :                                 }
     265             :                         }
     266           2 :                         size, err := d.readUvarint()
     267           2 :                         if err != nil {
     268           0 :                                 return err
     269           0 :                         }
     270             :                         // We read the smallest / largest key bounds differently depending on
     271             :                         // whether we have point, range or both types of keys present in the
     272             :                         // table.
     273           2 :                         var (
     274           2 :                                 smallestPointKey, largestPointKey []byte
     275           2 :                                 smallestRangeKey, largestRangeKey []byte
     276           2 :                                 parsedPointBounds                 bool
     277           2 :                                 boundsMarker                      byte
     278           2 :                         )
     279           2 :                         if tag != tagNewFile5 {
     280           2 :                                 // Range keys not present in the table. Parse the point key bounds.
     281           2 :                                 smallestPointKey, err = d.readBytes()
     282           2 :                                 if err != nil {
     283           0 :                                         return err
     284           0 :                                 }
     285           2 :                                 largestPointKey, err = d.readBytes()
     286           2 :                                 if err != nil {
     287           0 :                                         return err
     288           0 :                                 }
     289           2 :                         } else {
     290           2 :                                 // Range keys are present in the table. Determine whether we have point
     291           2 :                                 // keys to parse, in addition to the bounds.
     292           2 :                                 boundsMarker, err = d.ReadByte()
     293           2 :                                 if err != nil {
     294           0 :                                         return err
     295           0 :                                 }
     296             :                                 // Parse point key bounds, if present.
     297           2 :                                 if boundsMarker&maskContainsPointKeys > 0 {
     298           2 :                                         smallestPointKey, err = d.readBytes()
     299           2 :                                         if err != nil {
     300           0 :                                                 return err
     301           0 :                                         }
     302           2 :                                         largestPointKey, err = d.readBytes()
     303           2 :                                         if err != nil {
     304           0 :                                                 return err
     305           0 :                                         }
     306           2 :                                         parsedPointBounds = true
     307           2 :                                 } else {
     308           2 :                                         // The table does not have point keys.
     309           2 :                                         // Sanity check: the bounds must be range keys.
     310           2 :                                         if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
     311           0 :                                                 return base.CorruptionErrorf(
     312           0 :                                                         "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
     313           0 :                                                         boundsMarker,
     314           0 :                                                 )
     315           0 :                                         }
     316             :                                 }
     317             :                                 // Parse range key bounds.
     318           2 :                                 smallestRangeKey, err = d.readBytes()
     319           2 :                                 if err != nil {
     320           0 :                                         return err
     321           0 :                                 }
     322           2 :                                 largestRangeKey, err = d.readBytes()
     323           2 :                                 if err != nil {
     324           0 :                                         return err
     325           0 :                                 }
     326             :                         }
     327           2 :                         var smallestSeqNum uint64
     328           2 :                         var largestSeqNum uint64
     329           2 :                         if tag != tagNewFile {
     330           2 :                                 smallestSeqNum, err = d.readUvarint()
     331           2 :                                 if err != nil {
     332           0 :                                         return err
     333           0 :                                 }
     334           2 :                                 largestSeqNum, err = d.readUvarint()
     335           2 :                                 if err != nil {
     336           0 :                                         return err
     337           0 :                                 }
     338             :                         }
     339           2 :                         var markedForCompaction bool
     340           2 :                         var creationTime uint64
     341           2 :                         virtualState := struct {
     342           2 :                                 virtual        bool
     343           2 :                                 backingFileNum uint64
     344           2 :                         }{}
     345           2 :                         var syntheticPrefix sstable.SyntheticPrefix
     346           2 :                         var syntheticSuffix sstable.SyntheticSuffix
     347           2 :                         if tag == tagNewFile4 || tag == tagNewFile5 {
     348           2 :                                 for {
     349           2 :                                         customTag, err := d.readUvarint()
     350           2 :                                         if err != nil {
     351           0 :                                                 return err
     352           0 :                                         }
     353           2 :                                         if customTag == customTagTerminate {
     354           2 :                                                 break
     355             :                                         }
     356           2 :                                         switch customTag {
     357           1 :                                         case customTagNeedsCompaction:
     358           1 :                                                 field, err := d.readBytes()
     359           1 :                                                 if err != nil {
     360           0 :                                                         return err
     361           0 :                                                 }
     362           1 :                                                 if len(field) != 1 {
     363           0 :                                                         return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
     364           0 :                                                 }
     365           1 :                                                 markedForCompaction = (field[0] == 1)
     366             : 
     367           2 :                                         case customTagCreationTime:
     368           2 :                                                 field, err := d.readBytes()
     369           2 :                                                 if err != nil {
     370           0 :                                                         return err
     371           0 :                                                 }
     372           2 :                                                 var n int
     373           2 :                                                 creationTime, n = binary.Uvarint(field)
     374           2 :                                                 if n != len(field) {
     375           0 :                                                         return base.CorruptionErrorf("new-file4: invalid file creation time")
     376           0 :                                                 }
     377             : 
     378           0 :                                         case customTagPathID:
     379           0 :                                                 return base.CorruptionErrorf("new-file4: path-id field not supported")
     380             : 
     381           2 :                                         case customTagVirtual:
     382           2 :                                                 virtualState.virtual = true
     383           2 :                                                 if virtualState.backingFileNum, err = d.readUvarint(); err != nil {
     384           0 :                                                         return err
     385           0 :                                                 }
     386             : 
     387           2 :                                         case customTagSyntheticPrefix:
     388           2 :                                                 synthetic, err := d.readBytes()
     389           2 :                                                 if err != nil {
     390           0 :                                                         return err
     391           0 :                                                 }
     392           2 :                                                 syntheticPrefix = synthetic
     393             : 
     394           2 :                                         case customTagSyntheticSuffix:
     395           2 :                                                 if syntheticSuffix, err = d.readBytes(); err != nil {
     396           0 :                                                         return err
     397           0 :                                                 }
     398             : 
     399           0 :                                         default:
     400           0 :                                                 if (customTag & customTagNonSafeIgnoreMask) != 0 {
     401           0 :                                                         return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
     402           0 :                                                 }
     403           0 :                                                 if _, err := d.readBytes(); err != nil {
     404           0 :                                                         return err
     405           0 :                                                 }
     406             :                                         }
     407             :                                 }
     408             :                         }
     409           2 :                         m := &FileMetadata{
     410           2 :                                 FileNum:             fileNum,
     411           2 :                                 Size:                size,
     412           2 :                                 CreationTime:        int64(creationTime),
     413           2 :                                 SmallestSeqNum:      smallestSeqNum,
     414           2 :                                 LargestSeqNum:       largestSeqNum,
     415           2 :                                 MarkedForCompaction: markedForCompaction,
     416           2 :                                 Virtual:             virtualState.virtual,
     417           2 :                                 SyntheticPrefix:     syntheticPrefix,
     418           2 :                                 SyntheticSuffix:     syntheticSuffix,
     419           2 :                         }
     420           2 :                         if tag != tagNewFile5 { // no range keys present
     421           2 :                                 m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     422           2 :                                 m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     423           2 :                                 m.HasPointKeys = true
     424           2 :                                 m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
     425           2 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
     426           2 :                         } else { // range keys present
     427           2 :                                 // Set point key bounds, if parsed.
     428           2 :                                 if parsedPointBounds {
     429           2 :                                         m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     430           2 :                                         m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     431           2 :                                         m.HasPointKeys = true
     432           2 :                                 }
     433             :                                 // Set range key bounds.
     434           2 :                                 m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
     435           2 :                                 m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
     436           2 :                                 m.HasRangeKeys = true
     437           2 :                                 // Set overall bounds (by default assume range keys).
     438           2 :                                 m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
     439           2 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
     440           2 :                                 if boundsMarker&maskSmallest == maskSmallest {
     441           2 :                                         m.Smallest = m.SmallestPointKey
     442           2 :                                         m.boundTypeSmallest = boundTypePointKey
     443           2 :                                 }
     444           2 :                                 if boundsMarker&maskLargest == maskLargest {
     445           2 :                                         m.Largest = m.LargestPointKey
     446           2 :                                         m.boundTypeLargest = boundTypePointKey
     447           2 :                                 }
     448             :                         }
     449           2 :                         m.boundsSet = true
     450           2 :                         if !virtualState.virtual {
     451           2 :                                 m.InitPhysicalBacking()
     452           2 :                         }
     453             : 
     454           2 :                         nfe := NewFileEntry{
     455           2 :                                 Level: level,
     456           2 :                                 Meta:  m,
     457           2 :                         }
     458           2 :                         if virtualState.virtual {
     459           2 :                                 nfe.BackingFileNum = base.DiskFileNum(virtualState.backingFileNum)
     460           2 :                         }
     461           2 :                         v.NewFiles = append(v.NewFiles, nfe)
     462             : 
     463           1 :                 case tagPrevLogNumber:
     464           1 :                         n, err := d.readUvarint()
     465           1 :                         if err != nil {
     466           0 :                                 return err
     467           0 :                         }
     468           1 :                         v.ObsoletePrevLogNum = n
     469             : 
     470           0 :                 case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
     471           0 :                         return base.CorruptionErrorf("column families are not supported")
     472             : 
     473           0 :                 default:
     474           0 :                         return errCorruptManifest
     475             :                 }
     476             :         }
     477           2 :         return nil
     478             : }
     479             : 
     480           1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
     481           1 :         var buf bytes.Buffer
     482           1 :         if v.ComparerName != "" {
     483           1 :                 fmt.Fprintf(&buf, "  comparer:     %s", v.ComparerName)
     484           1 :         }
     485           1 :         if v.MinUnflushedLogNum != 0 {
     486           1 :                 fmt.Fprintf(&buf, "  log-num:       %d\n", v.MinUnflushedLogNum)
     487           1 :         }
     488           1 :         if v.ObsoletePrevLogNum != 0 {
     489           0 :                 fmt.Fprintf(&buf, "  prev-log-num:  %d\n", v.ObsoletePrevLogNum)
     490           0 :         }
     491           1 :         if v.NextFileNum != 0 {
     492           1 :                 fmt.Fprintf(&buf, "  next-file-num: %d\n", v.NextFileNum)
     493           1 :         }
     494           1 :         if v.LastSeqNum != 0 {
     495           1 :                 fmt.Fprintf(&buf, "  last-seq-num:  %d\n", v.LastSeqNum)
     496           1 :         }
     497           1 :         entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
     498           1 :         for df := range v.DeletedFiles {
     499           1 :                 entries = append(entries, df)
     500           1 :         }
     501           1 :         slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
     502           1 :                 if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
     503           1 :                         return v
     504           1 :                 }
     505           1 :                 return stdcmp.Compare(a.FileNum, b.FileNum)
     506             :         })
     507           1 :         for _, df := range entries {
     508           1 :                 fmt.Fprintf(&buf, "  del-table:     L%d %s\n", df.Level, df.FileNum)
     509           1 :         }
     510           1 :         for _, nf := range v.NewFiles {
     511           1 :                 fmt.Fprintf(&buf, "  add-table:     L%d", nf.Level)
     512           1 :                 fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, verbose))
     513           1 :                 if nf.Meta.CreationTime != 0 {
     514           1 :                         fmt.Fprintf(&buf, " (%s)",
     515           1 :                                 time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
     516           1 :                 }
     517           1 :                 fmt.Fprintln(&buf)
     518             :         }
     519             : 
     520           1 :         for _, b := range v.CreatedBackingTables {
     521           1 :                 fmt.Fprintf(&buf, "  add-backing:   %s\n", b.DiskFileNum)
     522           1 :         }
     523           1 :         for _, n := range v.RemovedBackingTables {
     524           1 :                 fmt.Fprintf(&buf, "  del-backing:   %s\n", n)
     525           1 :         }
     526           1 :         return buf.String()
     527             : }
     528             : 
     529             : // DebugString is a more verbose version of String(). Use this in tests.
     530           1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
     531           1 :         return v.string(true /* verbose */, fmtKey)
     532           1 : }
     533             : 
     534             : // String implements fmt.Stringer for a VersionEdit.
     535           1 : func (v *VersionEdit) String() string {
     536           1 :         return v.string(false /* verbose */, base.DefaultFormatter)
     537           1 : }
     538             : 
     539             : // ParseVersionEditDebug parses a VersionEdit from its DebugString
     540             : // implementation.
     541             : //
     542             : // It doesn't recognize all fields; this implementation can be filled in as
     543             : // needed.
     544           1 : func ParseVersionEditDebug(s string) (_ *VersionEdit, err error) {
     545           1 :         defer func() {
     546           1 :                 err = errors.CombineErrors(err, maybeRecover())
     547           1 :         }()
     548             : 
     549           1 :         var ve VersionEdit
     550           1 :         for _, l := range strings.Split(s, "\n") {
     551           1 :                 l = strings.TrimSpace(l)
     552           1 :                 if l == "" {
     553           1 :                         continue
     554             :                 }
     555           1 :                 field, value, ok := strings.Cut(l, ":")
     556           1 :                 if !ok {
     557           0 :                         return nil, errors.Errorf("malformed line %q", l)
     558           0 :                 }
     559           1 :                 field = strings.TrimSpace(field)
     560           1 :                 p := makeDebugParser(value)
     561           1 :                 switch field {
     562           1 :                 case "add-table":
     563           1 :                         level := p.Level()
     564           1 :                         meta, err := ParseFileMetadataDebug(p.Remaining())
     565           1 :                         if err != nil {
     566           0 :                                 return nil, err
     567           0 :                         }
     568           1 :                         ve.NewFiles = append(ve.NewFiles, NewFileEntry{
     569           1 :                                 Level: level,
     570           1 :                                 Meta:  meta,
     571           1 :                         })
     572             : 
     573           1 :                 case "del-table":
     574           1 :                         level := p.Level()
     575           1 :                         num := p.FileNum()
     576           1 :                         if ve.DeletedFiles == nil {
     577           1 :                                 ve.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
     578           1 :                         }
     579           1 :                         ve.DeletedFiles[DeletedFileEntry{
     580           1 :                                 Level:   level,
     581           1 :                                 FileNum: num,
     582           1 :                         }] = nil
     583             : 
     584           1 :                 case "add-backing":
     585           1 :                         n := p.DiskFileNum()
     586           1 :                         ve.CreatedBackingTables = append(ve.CreatedBackingTables, &FileBacking{
     587           1 :                                 DiskFileNum: n,
     588           1 :                                 Size:        100,
     589           1 :                         })
     590             : 
     591           1 :                 case "del-backing":
     592           1 :                         n := p.DiskFileNum()
     593           1 :                         ve.RemovedBackingTables = append(ve.RemovedBackingTables, n)
     594             : 
     595           0 :                 default:
     596           0 :                         return nil, errors.Errorf("field %q not implemented", field)
     597             :                 }
     598             :         }
     599           1 :         return &ve, nil
     600             : }
     601             : 
     602             : // Encode encodes an edit to the specified writer.
     603           2 : func (v *VersionEdit) Encode(w io.Writer) error {
     604           2 :         e := versionEditEncoder{new(bytes.Buffer)}
     605           2 : 
     606           2 :         if v.ComparerName != "" {
     607           2 :                 e.writeUvarint(tagComparator)
     608           2 :                 e.writeString(v.ComparerName)
     609           2 :         }
     610           2 :         if v.MinUnflushedLogNum != 0 {
     611           2 :                 e.writeUvarint(tagLogNumber)
     612           2 :                 e.writeUvarint(uint64(v.MinUnflushedLogNum))
     613           2 :         }
     614           2 :         if v.ObsoletePrevLogNum != 0 {
     615           1 :                 e.writeUvarint(tagPrevLogNumber)
     616           1 :                 e.writeUvarint(v.ObsoletePrevLogNum)
     617           1 :         }
     618           2 :         if v.NextFileNum != 0 {
     619           2 :                 e.writeUvarint(tagNextFileNumber)
     620           2 :                 e.writeUvarint(uint64(v.NextFileNum))
     621           2 :         }
     622           2 :         for _, dfn := range v.RemovedBackingTables {
     623           2 :                 e.writeUvarint(tagRemovedBackingTable)
     624           2 :                 e.writeUvarint(uint64(dfn))
     625           2 :         }
     626           2 :         for _, fileBacking := range v.CreatedBackingTables {
     627           2 :                 e.writeUvarint(tagCreatedBackingTable)
     628           2 :                 e.writeUvarint(uint64(fileBacking.DiskFileNum))
     629           2 :                 e.writeUvarint(fileBacking.Size)
     630           2 :         }
     631             :         // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
     632             :         // even though its value is zero. We detect this by encoding LastSeqNum when
     633             :         // ComparerName is set.
     634           2 :         if v.LastSeqNum != 0 || v.ComparerName != "" {
     635           2 :                 e.writeUvarint(tagLastSequence)
     636           2 :                 e.writeUvarint(v.LastSeqNum)
     637           2 :         }
     638           2 :         for x := range v.DeletedFiles {
     639           2 :                 e.writeUvarint(tagDeletedFile)
     640           2 :                 e.writeUvarint(uint64(x.Level))
     641           2 :                 e.writeUvarint(uint64(x.FileNum))
     642           2 :         }
     643           2 :         for _, x := range v.NewFiles {
     644           2 :                 customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
     645           2 :                 var tag uint64
     646           2 :                 switch {
     647           2 :                 case x.Meta.HasRangeKeys:
     648           2 :                         tag = tagNewFile5
     649           2 :                 case customFields:
     650           2 :                         tag = tagNewFile4
     651           1 :                 default:
     652           1 :                         tag = tagNewFile2
     653             :                 }
     654           2 :                 e.writeUvarint(tag)
     655           2 :                 e.writeUvarint(uint64(x.Level))
     656           2 :                 e.writeUvarint(uint64(x.Meta.FileNum))
     657           2 :                 e.writeUvarint(x.Meta.Size)
     658           2 :                 if !x.Meta.HasRangeKeys {
     659           2 :                         // If we have no range keys, preserve the original format and write the
     660           2 :                         // smallest and largest point keys.
     661           2 :                         e.writeKey(x.Meta.SmallestPointKey)
     662           2 :                         e.writeKey(x.Meta.LargestPointKey)
     663           2 :                 } else {
     664           2 :                         // When range keys are present, we first write a marker byte that
     665           2 :                         // indicates if the table also contains point keys, in addition to how the
     666           2 :                         // overall bounds for the table should be reconstructed. This byte is
     667           2 :                         // followed by the keys themselves.
     668           2 :                         b, err := x.Meta.boundsMarker()
     669           2 :                         if err != nil {
     670           0 :                                 return err
     671           0 :                         }
     672           2 :                         if err = e.WriteByte(b); err != nil {
     673           0 :                                 return err
     674           0 :                         }
     675             :                         // Write point key bounds (if present).
     676           2 :                         if x.Meta.HasPointKeys {
     677           2 :                                 e.writeKey(x.Meta.SmallestPointKey)
     678           2 :                                 e.writeKey(x.Meta.LargestPointKey)
     679           2 :                         }
     680             :                         // Write range key bounds.
     681           2 :                         e.writeKey(x.Meta.SmallestRangeKey)
     682           2 :                         e.writeKey(x.Meta.LargestRangeKey)
     683             :                 }
     684           2 :                 e.writeUvarint(x.Meta.SmallestSeqNum)
     685           2 :                 e.writeUvarint(x.Meta.LargestSeqNum)
     686           2 :                 if customFields {
     687           2 :                         if x.Meta.CreationTime != 0 {
     688           2 :                                 e.writeUvarint(customTagCreationTime)
     689           2 :                                 var buf [binary.MaxVarintLen64]byte
     690           2 :                                 n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
     691           2 :                                 e.writeBytes(buf[:n])
     692           2 :                         }
     693           2 :                         if x.Meta.MarkedForCompaction {
     694           1 :                                 e.writeUvarint(customTagNeedsCompaction)
     695           1 :                                 e.writeBytes([]byte{1})
     696           1 :                         }
     697           2 :                         if x.Meta.Virtual {
     698           2 :                                 e.writeUvarint(customTagVirtual)
     699           2 :                                 e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum))
     700           2 :                         }
     701           2 :                         if x.Meta.SyntheticPrefix != nil {
     702           2 :                                 e.writeUvarint(customTagSyntheticPrefix)
     703           2 :                                 e.writeBytes(x.Meta.SyntheticPrefix)
     704           2 :                         }
     705           2 :                         if x.Meta.SyntheticSuffix != nil {
     706           2 :                                 e.writeUvarint(customTagSyntheticSuffix)
     707           2 :                                 e.writeBytes(x.Meta.SyntheticSuffix)
     708           2 :                         }
     709           2 :                         e.writeUvarint(customTagTerminate)
     710             :                 }
     711             :         }
     712           2 :         _, err := w.Write(e.Bytes())
     713           2 :         return err
     714             : }
     715             : 
     716             : // versionEditDecoder should be used to decode version edits.
     717             : type versionEditDecoder struct {
     718             :         byteReader
     719             : }
     720             : 
     721           2 : func (d versionEditDecoder) readBytes() ([]byte, error) {
     722           2 :         n, err := d.readUvarint()
     723           2 :         if err != nil {
     724           0 :                 return nil, err
     725           0 :         }
     726           2 :         s := make([]byte, n)
     727           2 :         _, err = io.ReadFull(d, s)
     728           2 :         if err != nil {
     729           0 :                 if err == io.ErrUnexpectedEOF {
     730           0 :                         return nil, errCorruptManifest
     731           0 :                 }
     732           0 :                 return nil, err
     733             :         }
     734           2 :         return s, nil
     735             : }
     736             : 
     737           2 : func (d versionEditDecoder) readLevel() (int, error) {
     738           2 :         u, err := d.readUvarint()
     739           2 :         if err != nil {
     740           0 :                 return 0, err
     741           0 :         }
     742           2 :         if u >= NumLevels {
     743           0 :                 return 0, errCorruptManifest
     744           0 :         }
     745           2 :         return int(u), nil
     746             : }
     747             : 
     748           2 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
     749           2 :         u, err := d.readUvarint()
     750           2 :         if err != nil {
     751           0 :                 return 0, err
     752           0 :         }
     753           2 :         return base.FileNum(u), nil
     754             : }
     755             : 
     756           2 : func (d versionEditDecoder) readUvarint() (uint64, error) {
     757           2 :         u, err := binary.ReadUvarint(d)
     758           2 :         if err != nil {
     759           0 :                 if err == io.EOF {
     760           0 :                         return 0, errCorruptManifest
     761           0 :                 }
     762           0 :                 return 0, err
     763             :         }
     764           2 :         return u, nil
     765             : }
     766             : 
     767             : type versionEditEncoder struct {
     768             :         *bytes.Buffer
     769             : }
     770             : 
     771           2 : func (e versionEditEncoder) writeBytes(p []byte) {
     772           2 :         e.writeUvarint(uint64(len(p)))
     773           2 :         e.Write(p)
     774           2 : }
     775             : 
     776           2 : func (e versionEditEncoder) writeKey(k InternalKey) {
     777           2 :         e.writeUvarint(uint64(k.Size()))
     778           2 :         e.Write(k.UserKey)
     779           2 :         buf := k.EncodeTrailer()
     780           2 :         e.Write(buf[:])
     781           2 : }
     782             : 
     783           2 : func (e versionEditEncoder) writeString(s string) {
     784           2 :         e.writeUvarint(uint64(len(s)))
     785           2 :         e.WriteString(s)
     786           2 : }
     787             : 
     788           2 : func (e versionEditEncoder) writeUvarint(u uint64) {
     789           2 :         var buf [binary.MaxVarintLen64]byte
     790           2 :         n := binary.PutUvarint(buf[:], u)
     791           2 :         e.Write(buf[:n])
     792           2 : }
     793             : 
     794             : // BulkVersionEdit summarizes the files added and deleted from a set of version
     795             : // edits.
     796             : //
     797             : // INVARIANTS:
     798             : // No file can be added to a level more than once. This is true globally, and
     799             : // also true for all of the calls to Accumulate for a single bulk version edit.
     800             : //
     801             : // No file can be removed from a level more than once. This is true globally,
     802             : // and also true for all of the calls to Accumulate for a single bulk version
     803             : // edit.
     804             : //
     805             : // A file must not be added and removed from a given level in the same version
     806             : // edit.
     807             : //
     808             : // A file that is being removed from a level must have been added to that level
     809             : // before (in a prior version edit). Note that a given file can be deleted from
     810             : // a level and added to another level in a single version edit
     811             : type BulkVersionEdit struct {
     812             :         Added   [NumLevels]map[base.FileNum]*FileMetadata
     813             :         Deleted [NumLevels]map[base.FileNum]*FileMetadata
     814             : 
     815             :         // AddedFileBacking is a map to support lookup so that we can populate the
     816             :         // FileBacking of virtual sstables during manifest replay.
     817             :         AddedFileBacking   map[base.DiskFileNum]*FileBacking
     818             :         RemovedFileBacking []base.DiskFileNum
     819             : 
     820             :         // AddedByFileNum maps file number to file metadata for all added files
     821             :         // from accumulated version edits. AddedByFileNum is only populated if set
     822             :         // to non-nil by a caller. It must be set to non-nil when replaying
     823             :         // version edits read from a MANIFEST (as opposed to VersionEdits
     824             :         // constructed in-memory).  While replaying a MANIFEST file,
     825             :         // VersionEdit.DeletedFiles map entries have nil values, because the
     826             :         // on-disk deletion record encodes only the file number. Accumulate
     827             :         // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
     828             :         // field with non-nil *FileMetadata.
     829             :         AddedByFileNum map[base.FileNum]*FileMetadata
     830             : 
     831             :         // MarkedForCompactionCountDiff holds the aggregated count of files
     832             :         // marked for compaction added or removed.
     833             :         MarkedForCompactionCountDiff int
     834             : }
     835             : 
     836             : // Accumulate adds the file addition and deletions in the specified version
     837             : // edit to the bulk edit's internal state.
     838             : //
     839             : // INVARIANTS:
     840             : // If a file is added to a given level in a call to Accumulate and then removed
     841             : // from that level in a subsequent call, the file will not be present in the
     842             : // resulting BulkVersionEdit.Deleted for that level.
     843             : //
     844             : // After accumulation of version edits, the bulk version edit may have
     845             : // information about a file which has been deleted from a level, but it may
     846             : // not have information about the same file added to the same level. The add
     847             : // could've occurred as part of a previous bulk version edit. In this case,
     848             : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
     849             : // of the accumulation, because we need to decrease the refcount of the
     850             : // deleted file in Apply.
     851           2 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
     852           2 :         for df, m := range ve.DeletedFiles {
     853           2 :                 dmap := b.Deleted[df.Level]
     854           2 :                 if dmap == nil {
     855           2 :                         dmap = make(map[base.FileNum]*FileMetadata)
     856           2 :                         b.Deleted[df.Level] = dmap
     857           2 :                 }
     858             : 
     859           2 :                 if m == nil {
     860           2 :                         // m is nil only when replaying a MANIFEST.
     861           2 :                         if b.AddedByFileNum == nil {
     862           0 :                                 return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
     863           0 :                         }
     864           2 :                         m = b.AddedByFileNum[df.FileNum]
     865           2 :                         if m == nil {
     866           1 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
     867           1 :                         }
     868             :                 }
     869           2 :                 if m.MarkedForCompaction {
     870           1 :                         b.MarkedForCompactionCountDiff--
     871           1 :                 }
     872           2 :                 if _, ok := b.Added[df.Level][df.FileNum]; !ok {
     873           2 :                         dmap[df.FileNum] = m
     874           2 :                 } else {
     875           2 :                         // Present in b.Added for the same level.
     876           2 :                         delete(b.Added[df.Level], df.FileNum)
     877           2 :                 }
     878             :         }
     879             : 
     880             :         // Generate state for Added backing files. Note that these must be generated
     881             :         // before we loop through the NewFiles, because we need to populate the
     882             :         // FileBackings which might be used by the NewFiles loop.
     883           2 :         if b.AddedFileBacking == nil {
     884           2 :                 b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
     885           2 :         }
     886           2 :         for _, fb := range ve.CreatedBackingTables {
     887           2 :                 if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
     888           0 :                         // There is already a FileBacking associated with fb.DiskFileNum.
     889           0 :                         // This should never happen. There must always be only one FileBacking
     890           0 :                         // associated with a backing sstable.
     891           0 :                         panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
     892             :                 }
     893           2 :                 b.AddedFileBacking[fb.DiskFileNum] = fb
     894             :         }
     895             : 
     896           2 :         for _, nf := range ve.NewFiles {
     897           2 :                 // A new file should not have been deleted in this or a preceding
     898           2 :                 // VersionEdit at the same level (though files can move across levels).
     899           2 :                 if dmap := b.Deleted[nf.Level]; dmap != nil {
     900           2 :                         if _, ok := dmap[nf.Meta.FileNum]; ok {
     901           0 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
     902           0 :                         }
     903             :                 }
     904           2 :                 if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
     905           2 :                         // FileBacking for a virtual sstable must only be nil if we're performing
     906           2 :                         // manifest replay.
     907           2 :                         nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
     908           2 :                         if nf.Meta.FileBacking == nil {
     909           0 :                                 return errors.Errorf("FileBacking for virtual sstable must not be nil")
     910           0 :                         }
     911           2 :                 } else if nf.Meta.FileBacking == nil {
     912           0 :                         return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
     913           0 :                 }
     914             : 
     915           2 :                 if b.Added[nf.Level] == nil {
     916           2 :                         b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
     917           2 :                 }
     918           2 :                 b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
     919           2 :                 if b.AddedByFileNum != nil {
     920           2 :                         b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
     921           2 :                 }
     922           2 :                 if nf.Meta.MarkedForCompaction {
     923           0 :                         b.MarkedForCompactionCountDiff++
     924           0 :                 }
     925             :         }
     926             : 
     927           2 :         for _, n := range ve.RemovedBackingTables {
     928           2 :                 if _, ok := b.AddedFileBacking[n]; ok {
     929           2 :                         delete(b.AddedFileBacking, n)
     930           2 :                 } else {
     931           2 :                         // Since a file can be removed from backing files in exactly one version
     932           2 :                         // edit it is safe to just append without any de-duplication.
     933           2 :                         b.RemovedFileBacking = append(b.RemovedFileBacking, n)
     934           2 :                 }
     935             :         }
     936             : 
     937           2 :         return nil
     938             : }
     939             : 
     940             : // Apply applies the delta b to the current version to produce a new version.
     941             : // The new version is consistent with respect to the comparer.
     942             : //
     943             : // Apply updates the backing refcounts (Ref/Unref) as files are installed into
     944             : // the levels.
     945             : //
     946             : // curr may be nil, which is equivalent to a pointer to a zero version.
     947             : func (b *BulkVersionEdit) Apply(
     948             :         curr *Version, comparer *base.Comparer, flushSplitBytes int64, readCompactionRate int64,
     949           2 : ) (*Version, error) {
     950           2 :         v := &Version{
     951           2 :                 cmp: comparer,
     952           2 :         }
     953           2 : 
     954           2 :         // Adjust the count of files marked for compaction.
     955           2 :         if curr != nil {
     956           2 :                 v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
     957           2 :         }
     958           2 :         v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
     959           2 :         if v.Stats.MarkedForCompaction < 0 {
     960           0 :                 return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
     961           0 :         }
     962             : 
     963           2 :         for level := range v.Levels {
     964           2 :                 if curr == nil || curr.Levels[level].tree.root == nil {
     965           2 :                         v.Levels[level] = makeLevelMetadata(comparer.Compare, level, nil /* files */)
     966           2 :                 } else {
     967           2 :                         v.Levels[level] = curr.Levels[level].clone()
     968           2 :                 }
     969           2 :                 if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
     970           2 :                         v.RangeKeyLevels[level] = makeLevelMetadata(comparer.Compare, level, nil /* files */)
     971           2 :                 } else {
     972           2 :                         v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
     973           2 :                 }
     974             : 
     975           2 :                 if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
     976           2 :                         // There are no edits on this level.
     977           2 :                         if level == 0 {
     978           2 :                                 // Initialize L0Sublevels.
     979           2 :                                 if curr == nil || curr.L0Sublevels == nil {
     980           2 :                                         if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
     981           0 :                                                 return nil, errors.Wrap(err, "pebble: internal error")
     982           0 :                                         }
     983           2 :                                 } else {
     984           2 :                                         v.L0Sublevels = curr.L0Sublevels
     985           2 :                                         v.L0SublevelFiles = v.L0Sublevels.Levels
     986           2 :                                 }
     987             :                         }
     988           2 :                         continue
     989             :                 }
     990             : 
     991             :                 // Some edits on this level.
     992           2 :                 lm := &v.Levels[level]
     993           2 :                 lmRange := &v.RangeKeyLevels[level]
     994           2 : 
     995           2 :                 addedFilesMap := b.Added[level]
     996           2 :                 deletedFilesMap := b.Deleted[level]
     997           2 :                 if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
     998           1 :                         return nil, base.CorruptionErrorf(
     999           1 :                                 "pebble: internal error: No current or added files but have deleted files: %d",
    1000           1 :                                 errors.Safe(len(deletedFilesMap)))
    1001           1 :                 }
    1002             : 
    1003             :                 // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
    1004             :                 // for a level, it won't be present in deletedFilesMap for the same
    1005             :                 // level.
    1006             : 
    1007           2 :                 for _, f := range deletedFilesMap {
    1008           2 :                         if obsolete := v.Levels[level].remove(f); obsolete {
    1009           0 :                                 // Deleting a file from the B-Tree may decrement its
    1010           0 :                                 // reference count. However, because we cloned the
    1011           0 :                                 // previous level's B-Tree, this should never result in a
    1012           0 :                                 // file's reference count dropping to zero.
    1013           0 :                                 err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
    1014           0 :                                 return nil, err
    1015           0 :                         }
    1016           2 :                         if f.HasRangeKeys {
    1017           2 :                                 if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
    1018           0 :                                         // Deleting a file from the B-Tree may decrement its
    1019           0 :                                         // reference count. However, because we cloned the
    1020           0 :                                         // previous level's B-Tree, this should never result in a
    1021           0 :                                         // file's reference count dropping to zero.
    1022           0 :                                         err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
    1023           0 :                                         return nil, err
    1024           0 :                                 }
    1025             :                         }
    1026             :                 }
    1027             : 
    1028           2 :                 addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
    1029           2 :                 for _, f := range addedFilesMap {
    1030           2 :                         addedFiles = append(addedFiles, f)
    1031           2 :                 }
    1032             :                 // Sort addedFiles by file number. This isn't necessary, but tests which
    1033             :                 // replay invalid manifests check the error output, and the error output
    1034             :                 // depends on the order in which files are added to the btree.
    1035           2 :                 slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
    1036           2 :                         return stdcmp.Compare(a.FileNum, b.FileNum)
    1037           2 :                 })
    1038             : 
    1039           2 :                 var sm, la *FileMetadata
    1040           2 :                 for _, f := range addedFiles {
    1041           2 :                         // NB: allowedSeeks is used for read triggered compactions. It is set using
    1042           2 :                         // Options.Experimental.ReadCompactionRate which defaults to 32KB.
    1043           2 :                         var allowedSeeks int64
    1044           2 :                         if readCompactionRate != 0 {
    1045           2 :                                 allowedSeeks = int64(f.Size) / readCompactionRate
    1046           2 :                         }
    1047           2 :                         if allowedSeeks < 100 {
    1048           2 :                                 allowedSeeks = 100
    1049           2 :                         }
    1050           2 :                         f.AllowedSeeks.Store(allowedSeeks)
    1051           2 :                         f.InitAllowedSeeks = allowedSeeks
    1052           2 : 
    1053           2 :                         err := lm.insert(f)
    1054           2 :                         if err != nil {
    1055           1 :                                 return nil, errors.Wrap(err, "pebble")
    1056           1 :                         }
    1057           2 :                         if f.HasRangeKeys {
    1058           2 :                                 err = lmRange.insert(f)
    1059           2 :                                 if err != nil {
    1060           0 :                                         return nil, errors.Wrap(err, "pebble")
    1061           0 :                                 }
    1062             :                         }
    1063             :                         // Track the keys with the smallest and largest keys, so that we can
    1064             :                         // check consistency of the modified span.
    1065           2 :                         if sm == nil || base.InternalCompare(comparer.Compare, sm.Smallest, f.Smallest) > 0 {
    1066           2 :                                 sm = f
    1067           2 :                         }
    1068           2 :                         if la == nil || base.InternalCompare(comparer.Compare, la.Largest, f.Largest) < 0 {
    1069           2 :                                 la = f
    1070           2 :                         }
    1071             :                 }
    1072             : 
    1073           2 :                 if level == 0 {
    1074           2 :                         if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
    1075           2 :                                 // Flushes and ingestions that do not delete any L0 files do not require
    1076           2 :                                 // a regeneration of L0Sublevels from scratch. We can instead generate
    1077           2 :                                 // it incrementally.
    1078           2 :                                 var err error
    1079           2 :                                 // AddL0Files requires addedFiles to be sorted in seqnum order.
    1080           2 :                                 SortBySeqNum(addedFiles)
    1081           2 :                                 v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
    1082           2 :                                 if errors.Is(err, errInvalidL0SublevelsOpt) {
    1083           2 :                                         err = v.InitL0Sublevels(flushSplitBytes)
    1084           2 :                                 } else if invariants.Enabled && err == nil {
    1085           2 :                                         copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], comparer.Compare, comparer.FormatKey, flushSplitBytes)
    1086           2 :                                         if err != nil {
    1087           0 :                                                 panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
    1088             :                                         }
    1089           2 :                                         s1 := describeSublevels(comparer.FormatKey, false /* verbose */, copyOfSublevels.Levels)
    1090           2 :                                         s2 := describeSublevels(comparer.FormatKey, false /* verbose */, v.L0Sublevels.Levels)
    1091           2 :                                         if s1 != s2 {
    1092           0 :                                                 // Add verbosity.
    1093           0 :                                                 s1 := describeSublevels(comparer.FormatKey, true /* verbose */, copyOfSublevels.Levels)
    1094           0 :                                                 s2 := describeSublevels(comparer.FormatKey, true /* verbose */, v.L0Sublevels.Levels)
    1095           0 :                                                 panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
    1096             :                                         }
    1097             :                                 }
    1098           2 :                                 if err != nil {
    1099           0 :                                         return nil, errors.Wrap(err, "pebble: internal error")
    1100           0 :                                 }
    1101           2 :                                 v.L0SublevelFiles = v.L0Sublevels.Levels
    1102           2 :                         } else if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
    1103           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1104           0 :                         }
    1105           2 :                         if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(0), v.Levels[level].Iter()); err != nil {
    1106           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1107           0 :                         }
    1108           2 :                         continue
    1109             :                 }
    1110             : 
    1111             :                 // Check consistency of the level in the vicinity of our edits.
    1112           2 :                 if sm != nil && la != nil {
    1113           2 :                         overlap := overlaps(v.Levels[level].Iter(), comparer.Compare, sm.Smallest.UserKey,
    1114           2 :                                 la.Largest.UserKey, la.Largest.IsExclusiveSentinel())
    1115           2 :                         // overlap contains all of the added files. We want to ensure that
    1116           2 :                         // the added files are consistent with neighboring existing files
    1117           2 :                         // too, so reslice the overlap to pull in a neighbor on each side.
    1118           2 :                         check := overlap.Reslice(func(start, end *LevelIterator) {
    1119           2 :                                 if m := start.Prev(); m == nil {
    1120           2 :                                         start.Next()
    1121           2 :                                 }
    1122           2 :                                 if m := end.Next(); m == nil {
    1123           2 :                                         end.Prev()
    1124           2 :                                 }
    1125             :                         })
    1126           2 :                         if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(level), check.Iter()); err != nil {
    1127           1 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1128           1 :                         }
    1129             :                 }
    1130             :         }
    1131           2 :         return v, nil
    1132             : }

Generated by: LCOV version 1.14