LCOV - code coverage report
Current view: top level - pebble/internal/manifest - version_edit.go (source / functions) Hit Total Coverage
Test: 2024-07-15 08:16Z 9a4ea4df - meta test only.lcov Lines: 476 760 62.6 %
Date: 2024-07-15 08:17:14 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package manifest
       6             : 
       7             : import (
       8             :         "bufio"
       9             :         "bytes"
      10             :         stdcmp "cmp"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "slices"
      15             :         "strings"
      16             :         "time"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble/internal/base"
      20             :         "github.com/cockroachdb/pebble/internal/invariants"
      21             :         "github.com/cockroachdb/pebble/sstable"
      22             : )
      23             : 
      24             : // TODO(peter): describe the MANIFEST file format, independently of the C++
      25             : // project.
      26             : 
      27             : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
      28             : 
      29             : type byteReader interface {
      30             :         io.ByteReader
      31             :         io.Reader
      32             : }
      33             : 
      34             : // Tags for the versionEdit disk format.
      35             : // Tag 8 is no longer used.
      36             : const (
      37             :         // LevelDB tags.
      38             :         tagComparator     = 1
      39             :         tagLogNumber      = 2
      40             :         tagNextFileNumber = 3
      41             :         tagLastSequence   = 4
      42             :         tagCompactPointer = 5
      43             :         tagDeletedFile    = 6
      44             :         tagNewFile        = 7
      45             :         tagPrevLogNumber  = 9
      46             : 
      47             :         // RocksDB tags.
      48             :         tagNewFile2         = 100
      49             :         tagNewFile3         = 102
      50             :         tagNewFile4         = 103
      51             :         tagColumnFamily     = 200
      52             :         tagColumnFamilyAdd  = 201
      53             :         tagColumnFamilyDrop = 202
      54             :         tagMaxColumnFamily  = 203
      55             : 
      56             :         // Pebble tags.
      57             :         tagNewFile5            = 104 // Range keys.
      58             :         tagCreatedBackingTable = 105
      59             :         tagRemovedBackingTable = 106
      60             : 
      61             :         // The custom tags sub-format used by tagNewFile4 and above. All tags less
      62             :         // than customTagNonSafeIgnoreMask are safe to ignore and their format must be
      63             :         // a single bytes field.
      64             :         customTagTerminate         = 1
      65             :         customTagNeedsCompaction   = 2
      66             :         customTagCreationTime      = 6
      67             :         customTagPathID            = 65
      68             :         customTagNonSafeIgnoreMask = 1 << 6
      69             :         customTagVirtual           = 66
      70             :         customTagSyntheticPrefix   = 67
      71             :         customTagSyntheticSuffix   = 68
      72             : )
      73             : 
      74             : // DeletedFileEntry holds the state for a file deletion from a level. The file
      75             : // itself might still be referenced by another level.
      76             : type DeletedFileEntry struct {
      77             :         Level   int
      78             :         FileNum base.FileNum
      79             : }
      80             : 
      81             : // NewFileEntry holds the state for a new file or one moved from a different
      82             : // level.
      83             : type NewFileEntry struct {
      84             :         Level int
      85             :         Meta  *FileMetadata
      86             :         // BackingFileNum is only set during manifest replay, and only for virtual
      87             :         // sstables.
      88             :         BackingFileNum base.DiskFileNum
      89             : }
      90             : 
      91             : // VersionEdit holds the state for an edit to a Version along with other
      92             : // on-disk state (log numbers, next file number, and the last sequence number).
      93             : type VersionEdit struct {
      94             :         // ComparerName is the value of Options.Comparer.Name. This is only set in
      95             :         // the first VersionEdit in a manifest (either when the DB is created, or
      96             :         // when a new manifest is created) and is used to verify that the comparer
      97             :         // specified at Open matches the comparer that was previously used.
      98             :         ComparerName string
      99             : 
     100             :         // MinUnflushedLogNum is the smallest WAL log file number corresponding to
     101             :         // mutations that have not been flushed to an sstable.
     102             :         //
     103             :         // This is an optional field, and 0 represents it is not set.
     104             :         MinUnflushedLogNum base.DiskFileNum
     105             : 
     106             :         // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
     107             :         // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
     108             :         // 6/2011. We keep it around purely for informational purposes when
     109             :         // displaying MANIFEST contents.
     110             :         ObsoletePrevLogNum uint64
     111             : 
     112             :         // The next file number. A single counter is used to assign file numbers
     113             :         // for the WAL, MANIFEST, sstable, and OPTIONS files.
     114             :         NextFileNum uint64
     115             : 
     116             :         // LastSeqNum is an upper bound on the sequence numbers that have been
     117             :         // assigned in flushed WALs. Unflushed WALs (that will be replayed during
     118             :         // recovery) may contain sequence numbers greater than this value.
     119             :         LastSeqNum base.SeqNum
     120             : 
     121             :         // A file num may be present in both deleted files and new files when it
     122             :         // is moved from a lower level to a higher level (when the compaction
     123             :         // found that there was no overlapping file at the higher level).
     124             :         DeletedFiles map[DeletedFileEntry]*FileMetadata
     125             :         NewFiles     []NewFileEntry
     126             :         // CreatedBackingTables can be used to preserve the FileBacking associated
     127             :         // with a physical sstable. This is useful when virtual sstables in the
     128             :         // latest version are reconstructed during manifest replay, and we also need
     129             :         // to reconstruct the FileBacking which is required by these virtual
     130             :         // sstables.
     131             :         //
     132             :         // INVARIANT: The FileBacking associated with a physical sstable must only
     133             :         // be added as a backing file in the same version edit where the physical
     134             :         // sstable is first virtualized. This means that the physical sstable must
     135             :         // be present in DeletedFiles and that there must be at least one virtual
     136             :         // sstable with the same FileBacking as the physical sstable in NewFiles. A
     137             :         // file must be present in CreatedBackingTables in exactly one version edit.
     138             :         // The physical sstable associated with the FileBacking must also not be
     139             :         // present in NewFiles.
     140             :         CreatedBackingTables []*FileBacking
     141             :         // RemovedBackingTables is used to remove the FileBacking associated with a
     142             :         // virtual sstable. Note that a backing sstable can be removed as soon as
     143             :         // there are no virtual sstables in the latest version which are using the
     144             :         // backing sstable, but the backing sstable doesn't necessarily have to be
     145             :         // removed atomically with the version edit which removes the last virtual
     146             :         // sstable associated with the backing sstable. The removal can happen in a
     147             :         // future version edit.
     148             :         //
     149             :         // INVARIANT: A file must only be added to RemovedBackingTables if it was
     150             :         // added to CreateBackingTables in a prior version edit. The same version
     151             :         // edit also cannot have the same file present in both CreateBackingTables
     152             :         // and RemovedBackingTables. A file must be present in RemovedBackingTables
     153             :         // in exactly one version edit.
     154             :         RemovedBackingTables []base.DiskFileNum
     155             : }
     156             : 
     157             : // Decode decodes an edit from the specified reader.
     158             : //
     159             : // Note that the Decode step will not set the FileBacking for virtual sstables
     160             : // and the responsibility is left to the caller. However, the Decode step will
     161             : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
     162           1 : func (v *VersionEdit) Decode(r io.Reader) error {
     163           1 :         br, ok := r.(byteReader)
     164           1 :         if !ok {
     165           1 :                 br = bufio.NewReader(r)
     166           1 :         }
     167           1 :         d := versionEditDecoder{br}
     168           1 :         for {
     169           1 :                 tag, err := binary.ReadUvarint(br)
     170           1 :                 if err == io.EOF {
     171           1 :                         break
     172             :                 }
     173           1 :                 if err != nil {
     174           0 :                         return err
     175           0 :                 }
     176           1 :                 switch tag {
     177           1 :                 case tagComparator:
     178           1 :                         s, err := d.readBytes()
     179           1 :                         if err != nil {
     180           0 :                                 return err
     181           0 :                         }
     182           1 :                         v.ComparerName = string(s)
     183             : 
     184           1 :                 case tagLogNumber:
     185           1 :                         n, err := d.readUvarint()
     186           1 :                         if err != nil {
     187           0 :                                 return err
     188           0 :                         }
     189           1 :                         v.MinUnflushedLogNum = base.DiskFileNum(n)
     190             : 
     191           1 :                 case tagNextFileNumber:
     192           1 :                         n, err := d.readUvarint()
     193           1 :                         if err != nil {
     194           0 :                                 return err
     195           0 :                         }
     196           1 :                         v.NextFileNum = n
     197             : 
     198           1 :                 case tagLastSequence:
     199           1 :                         n, err := d.readUvarint()
     200           1 :                         if err != nil {
     201           0 :                                 return err
     202           0 :                         }
     203           1 :                         v.LastSeqNum = base.SeqNum(n)
     204             : 
     205           0 :                 case tagCompactPointer:
     206           0 :                         if _, err := d.readLevel(); err != nil {
     207           0 :                                 return err
     208           0 :                         }
     209           0 :                         if _, err := d.readBytes(); err != nil {
     210           0 :                                 return err
     211           0 :                         }
     212             :                         // NB: RocksDB does not use compaction pointers anymore.
     213             : 
     214           1 :                 case tagRemovedBackingTable:
     215           1 :                         n, err := d.readUvarint()
     216           1 :                         if err != nil {
     217           0 :                                 return err
     218           0 :                         }
     219           1 :                         v.RemovedBackingTables = append(
     220           1 :                                 v.RemovedBackingTables, base.DiskFileNum(n),
     221           1 :                         )
     222           1 :                 case tagCreatedBackingTable:
     223           1 :                         dfn, err := d.readUvarint()
     224           1 :                         if err != nil {
     225           0 :                                 return err
     226           0 :                         }
     227           1 :                         size, err := d.readUvarint()
     228           1 :                         if err != nil {
     229           0 :                                 return err
     230           0 :                         }
     231           1 :                         fileBacking := &FileBacking{
     232           1 :                                 DiskFileNum: base.DiskFileNum(dfn),
     233           1 :                                 Size:        size,
     234           1 :                         }
     235           1 :                         v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
     236           1 :                 case tagDeletedFile:
     237           1 :                         level, err := d.readLevel()
     238           1 :                         if err != nil {
     239           0 :                                 return err
     240           0 :                         }
     241           1 :                         fileNum, err := d.readFileNum()
     242           1 :                         if err != nil {
     243           0 :                                 return err
     244           0 :                         }
     245           1 :                         if v.DeletedFiles == nil {
     246           1 :                                 v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
     247           1 :                         }
     248           1 :                         v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
     249             : 
     250           1 :                 case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
     251           1 :                         level, err := d.readLevel()
     252           1 :                         if err != nil {
     253           0 :                                 return err
     254           0 :                         }
     255           1 :                         fileNum, err := d.readFileNum()
     256           1 :                         if err != nil {
     257           0 :                                 return err
     258           0 :                         }
     259           1 :                         if tag == tagNewFile3 {
     260           0 :                                 // The pathID field appears unused in RocksDB.
     261           0 :                                 _ /* pathID */, err := d.readUvarint()
     262           0 :                                 if err != nil {
     263           0 :                                         return err
     264           0 :                                 }
     265             :                         }
     266           1 :                         size, err := d.readUvarint()
     267           1 :                         if err != nil {
     268           0 :                                 return err
     269           0 :                         }
     270             :                         // We read the smallest / largest key bounds differently depending on
     271             :                         // whether we have point, range or both types of keys present in the
     272             :                         // table.
     273           1 :                         var (
     274           1 :                                 smallestPointKey, largestPointKey []byte
     275           1 :                                 smallestRangeKey, largestRangeKey []byte
     276           1 :                                 parsedPointBounds                 bool
     277           1 :                                 boundsMarker                      byte
     278           1 :                         )
     279           1 :                         if tag != tagNewFile5 {
     280           1 :                                 // Range keys not present in the table. Parse the point key bounds.
     281           1 :                                 smallestPointKey, err = d.readBytes()
     282           1 :                                 if err != nil {
     283           0 :                                         return err
     284           0 :                                 }
     285           1 :                                 largestPointKey, err = d.readBytes()
     286           1 :                                 if err != nil {
     287           0 :                                         return err
     288           0 :                                 }
     289           1 :                         } else {
     290           1 :                                 // Range keys are present in the table. Determine whether we have point
     291           1 :                                 // keys to parse, in addition to the bounds.
     292           1 :                                 boundsMarker, err = d.ReadByte()
     293           1 :                                 if err != nil {
     294           0 :                                         return err
     295           0 :                                 }
     296             :                                 // Parse point key bounds, if present.
     297           1 :                                 if boundsMarker&maskContainsPointKeys > 0 {
     298           1 :                                         smallestPointKey, err = d.readBytes()
     299           1 :                                         if err != nil {
     300           0 :                                                 return err
     301           0 :                                         }
     302           1 :                                         largestPointKey, err = d.readBytes()
     303           1 :                                         if err != nil {
     304           0 :                                                 return err
     305           0 :                                         }
     306           1 :                                         parsedPointBounds = true
     307           1 :                                 } else {
     308           1 :                                         // The table does not have point keys.
     309           1 :                                         // Sanity check: the bounds must be range keys.
     310           1 :                                         if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
     311           0 :                                                 return base.CorruptionErrorf(
     312           0 :                                                         "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
     313           0 :                                                         boundsMarker,
     314           0 :                                                 )
     315           0 :                                         }
     316             :                                 }
     317             :                                 // Parse range key bounds.
     318           1 :                                 smallestRangeKey, err = d.readBytes()
     319           1 :                                 if err != nil {
     320           0 :                                         return err
     321           0 :                                 }
     322           1 :                                 largestRangeKey, err = d.readBytes()
     323           1 :                                 if err != nil {
     324           0 :                                         return err
     325           0 :                                 }
     326             :                         }
     327           1 :                         var smallestSeqNum base.SeqNum
     328           1 :                         var largestSeqNum base.SeqNum
     329           1 :                         if tag != tagNewFile {
     330           1 :                                 n, err := d.readUvarint()
     331           1 :                                 if err != nil {
     332           0 :                                         return err
     333           0 :                                 }
     334           1 :                                 smallestSeqNum = base.SeqNum(n)
     335           1 :                                 n, err = d.readUvarint()
     336           1 :                                 if err != nil {
     337           0 :                                         return err
     338           0 :                                 }
     339           1 :                                 largestSeqNum = base.SeqNum(n)
     340             :                         }
     341           1 :                         var markedForCompaction bool
     342           1 :                         var creationTime uint64
     343           1 :                         virtualState := struct {
     344           1 :                                 virtual        bool
     345           1 :                                 backingFileNum uint64
     346           1 :                         }{}
     347           1 :                         var syntheticPrefix sstable.SyntheticPrefix
     348           1 :                         var syntheticSuffix sstable.SyntheticSuffix
     349           1 :                         if tag == tagNewFile4 || tag == tagNewFile5 {
     350           1 :                                 for {
     351           1 :                                         customTag, err := d.readUvarint()
     352           1 :                                         if err != nil {
     353           0 :                                                 return err
     354           0 :                                         }
     355           1 :                                         if customTag == customTagTerminate {
     356           1 :                                                 break
     357             :                                         }
     358           1 :                                         switch customTag {
     359           0 :                                         case customTagNeedsCompaction:
     360           0 :                                                 field, err := d.readBytes()
     361           0 :                                                 if err != nil {
     362           0 :                                                         return err
     363           0 :                                                 }
     364           0 :                                                 if len(field) != 1 {
     365           0 :                                                         return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
     366           0 :                                                 }
     367           0 :                                                 markedForCompaction = (field[0] == 1)
     368             : 
     369           1 :                                         case customTagCreationTime:
     370           1 :                                                 field, err := d.readBytes()
     371           1 :                                                 if err != nil {
     372           0 :                                                         return err
     373           0 :                                                 }
     374           1 :                                                 var n int
     375           1 :                                                 creationTime, n = binary.Uvarint(field)
     376           1 :                                                 if n != len(field) {
     377           0 :                                                         return base.CorruptionErrorf("new-file4: invalid file creation time")
     378           0 :                                                 }
     379             : 
     380           0 :                                         case customTagPathID:
     381           0 :                                                 return base.CorruptionErrorf("new-file4: path-id field not supported")
     382             : 
     383           1 :                                         case customTagVirtual:
     384           1 :                                                 virtualState.virtual = true
     385           1 :                                                 if virtualState.backingFileNum, err = d.readUvarint(); err != nil {
     386           0 :                                                         return err
     387           0 :                                                 }
     388             : 
     389           1 :                                         case customTagSyntheticPrefix:
     390           1 :                                                 synthetic, err := d.readBytes()
     391           1 :                                                 if err != nil {
     392           0 :                                                         return err
     393           0 :                                                 }
     394           1 :                                                 syntheticPrefix = synthetic
     395             : 
     396           1 :                                         case customTagSyntheticSuffix:
     397           1 :                                                 if syntheticSuffix, err = d.readBytes(); err != nil {
     398           0 :                                                         return err
     399           0 :                                                 }
     400             : 
     401           0 :                                         default:
     402           0 :                                                 if (customTag & customTagNonSafeIgnoreMask) != 0 {
     403           0 :                                                         return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
     404           0 :                                                 }
     405           0 :                                                 if _, err := d.readBytes(); err != nil {
     406           0 :                                                         return err
     407           0 :                                                 }
     408             :                                         }
     409             :                                 }
     410             :                         }
     411           1 :                         m := &FileMetadata{
     412           1 :                                 FileNum:               fileNum,
     413           1 :                                 Size:                  size,
     414           1 :                                 CreationTime:          int64(creationTime),
     415           1 :                                 SmallestSeqNum:        smallestSeqNum,
     416           1 :                                 LargestSeqNum:         largestSeqNum,
     417           1 :                                 LargestSeqNumAbsolute: largestSeqNum,
     418           1 :                                 MarkedForCompaction:   markedForCompaction,
     419           1 :                                 Virtual:               virtualState.virtual,
     420           1 :                                 SyntheticPrefix:       syntheticPrefix,
     421           1 :                                 SyntheticSuffix:       syntheticSuffix,
     422           1 :                         }
     423           1 :                         if tag != tagNewFile5 { // no range keys present
     424           1 :                                 m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     425           1 :                                 m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     426           1 :                                 m.HasPointKeys = true
     427           1 :                                 m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
     428           1 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
     429           1 :                         } else { // range keys present
     430           1 :                                 // Set point key bounds, if parsed.
     431           1 :                                 if parsedPointBounds {
     432           1 :                                         m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     433           1 :                                         m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     434           1 :                                         m.HasPointKeys = true
     435           1 :                                 }
     436             :                                 // Set range key bounds.
     437           1 :                                 m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
     438           1 :                                 m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
     439           1 :                                 m.HasRangeKeys = true
     440           1 :                                 // Set overall bounds (by default assume range keys).
     441           1 :                                 m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
     442           1 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
     443           1 :                                 if boundsMarker&maskSmallest == maskSmallest {
     444           1 :                                         m.Smallest = m.SmallestPointKey
     445           1 :                                         m.boundTypeSmallest = boundTypePointKey
     446           1 :                                 }
     447           1 :                                 if boundsMarker&maskLargest == maskLargest {
     448           1 :                                         m.Largest = m.LargestPointKey
     449           1 :                                         m.boundTypeLargest = boundTypePointKey
     450           1 :                                 }
     451             :                         }
     452           1 :                         m.boundsSet = true
     453           1 :                         if !virtualState.virtual {
     454           1 :                                 m.InitPhysicalBacking()
     455           1 :                         }
     456             : 
     457           1 :                         nfe := NewFileEntry{
     458           1 :                                 Level: level,
     459           1 :                                 Meta:  m,
     460           1 :                         }
     461           1 :                         if virtualState.virtual {
     462           1 :                                 nfe.BackingFileNum = base.DiskFileNum(virtualState.backingFileNum)
     463           1 :                         }
     464           1 :                         v.NewFiles = append(v.NewFiles, nfe)
     465             : 
     466           0 :                 case tagPrevLogNumber:
     467           0 :                         n, err := d.readUvarint()
     468           0 :                         if err != nil {
     469           0 :                                 return err
     470           0 :                         }
     471           0 :                         v.ObsoletePrevLogNum = n
     472             : 
     473           0 :                 case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
     474           0 :                         return base.CorruptionErrorf("column families are not supported")
     475             : 
     476           0 :                 default:
     477           0 :                         return errCorruptManifest
     478             :                 }
     479             :         }
     480           1 :         return nil
     481             : }
     482             : 
     483           0 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
     484           0 :         var buf bytes.Buffer
     485           0 :         if v.ComparerName != "" {
     486           0 :                 fmt.Fprintf(&buf, "  comparer:     %s", v.ComparerName)
     487           0 :         }
     488           0 :         if v.MinUnflushedLogNum != 0 {
     489           0 :                 fmt.Fprintf(&buf, "  log-num:       %d\n", v.MinUnflushedLogNum)
     490           0 :         }
     491           0 :         if v.ObsoletePrevLogNum != 0 {
     492           0 :                 fmt.Fprintf(&buf, "  prev-log-num:  %d\n", v.ObsoletePrevLogNum)
     493           0 :         }
     494           0 :         if v.NextFileNum != 0 {
     495           0 :                 fmt.Fprintf(&buf, "  next-file-num: %d\n", v.NextFileNum)
     496           0 :         }
     497           0 :         if v.LastSeqNum != 0 {
     498           0 :                 fmt.Fprintf(&buf, "  last-seq-num:  %d\n", v.LastSeqNum)
     499           0 :         }
     500           0 :         entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
     501           0 :         for df := range v.DeletedFiles {
     502           0 :                 entries = append(entries, df)
     503           0 :         }
     504           0 :         slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
     505           0 :                 if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
     506           0 :                         return v
     507           0 :                 }
     508           0 :                 return stdcmp.Compare(a.FileNum, b.FileNum)
     509             :         })
     510           0 :         for _, df := range entries {
     511           0 :                 fmt.Fprintf(&buf, "  del-table:     L%d %s\n", df.Level, df.FileNum)
     512           0 :         }
     513           0 :         for _, nf := range v.NewFiles {
     514           0 :                 fmt.Fprintf(&buf, "  add-table:     L%d", nf.Level)
     515           0 :                 fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, verbose))
     516           0 :                 if nf.Meta.CreationTime != 0 {
     517           0 :                         fmt.Fprintf(&buf, " (%s)",
     518           0 :                                 time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
     519           0 :                 }
     520           0 :                 fmt.Fprintln(&buf)
     521             :         }
     522             : 
     523           0 :         for _, b := range v.CreatedBackingTables {
     524           0 :                 fmt.Fprintf(&buf, "  add-backing:   %s\n", b.DiskFileNum)
     525           0 :         }
     526           0 :         for _, n := range v.RemovedBackingTables {
     527           0 :                 fmt.Fprintf(&buf, "  del-backing:   %s\n", n)
     528           0 :         }
     529           0 :         return buf.String()
     530             : }
     531             : 
     532             : // DebugString is a more verbose version of String(). Use this in tests.
     533           0 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
     534           0 :         return v.string(true /* verbose */, fmtKey)
     535           0 : }
     536             : 
     537             : // String implements fmt.Stringer for a VersionEdit.
     538           0 : func (v *VersionEdit) String() string {
     539           0 :         return v.string(false /* verbose */, base.DefaultFormatter)
     540           0 : }
     541             : 
     542             : // ParseVersionEditDebug parses a VersionEdit from its DebugString
     543             : // implementation.
     544             : //
     545             : // It doesn't recognize all fields; this implementation can be filled in as
     546             : // needed.
     547           0 : func ParseVersionEditDebug(s string) (_ *VersionEdit, err error) {
     548           0 :         defer func() {
     549           0 :                 err = errors.CombineErrors(err, maybeRecover())
     550           0 :         }()
     551             : 
     552           0 :         var ve VersionEdit
     553           0 :         for _, l := range strings.Split(s, "\n") {
     554           0 :                 l = strings.TrimSpace(l)
     555           0 :                 if l == "" {
     556           0 :                         continue
     557             :                 }
     558           0 :                 field, value, ok := strings.Cut(l, ":")
     559           0 :                 if !ok {
     560           0 :                         return nil, errors.Errorf("malformed line %q", l)
     561           0 :                 }
     562           0 :                 field = strings.TrimSpace(field)
     563           0 :                 p := makeDebugParser(value)
     564           0 :                 switch field {
     565           0 :                 case "add-table":
     566           0 :                         level := p.Level()
     567           0 :                         meta, err := ParseFileMetadataDebug(p.Remaining())
     568           0 :                         if err != nil {
     569           0 :                                 return nil, err
     570           0 :                         }
     571           0 :                         ve.NewFiles = append(ve.NewFiles, NewFileEntry{
     572           0 :                                 Level: level,
     573           0 :                                 Meta:  meta,
     574           0 :                         })
     575             : 
     576           0 :                 case "del-table":
     577           0 :                         level := p.Level()
     578           0 :                         num := p.FileNum()
     579           0 :                         if ve.DeletedFiles == nil {
     580           0 :                                 ve.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
     581           0 :                         }
     582           0 :                         ve.DeletedFiles[DeletedFileEntry{
     583           0 :                                 Level:   level,
     584           0 :                                 FileNum: num,
     585           0 :                         }] = nil
     586             : 
     587           0 :                 case "add-backing":
     588           0 :                         n := p.DiskFileNum()
     589           0 :                         ve.CreatedBackingTables = append(ve.CreatedBackingTables, &FileBacking{
     590           0 :                                 DiskFileNum: n,
     591           0 :                                 Size:        100,
     592           0 :                         })
     593             : 
     594           0 :                 case "del-backing":
     595           0 :                         n := p.DiskFileNum()
     596           0 :                         ve.RemovedBackingTables = append(ve.RemovedBackingTables, n)
     597             : 
     598           0 :                 default:
     599           0 :                         return nil, errors.Errorf("field %q not implemented", field)
     600             :                 }
     601             :         }
     602           0 :         return &ve, nil
     603             : }
     604             : 
     605             : // Encode encodes an edit to the specified writer.
     606           1 : func (v *VersionEdit) Encode(w io.Writer) error {
     607           1 :         e := versionEditEncoder{new(bytes.Buffer)}
     608           1 : 
     609           1 :         if v.ComparerName != "" {
     610           1 :                 e.writeUvarint(tagComparator)
     611           1 :                 e.writeString(v.ComparerName)
     612           1 :         }
     613           1 :         if v.MinUnflushedLogNum != 0 {
     614           1 :                 e.writeUvarint(tagLogNumber)
     615           1 :                 e.writeUvarint(uint64(v.MinUnflushedLogNum))
     616           1 :         }
     617           1 :         if v.ObsoletePrevLogNum != 0 {
     618           0 :                 e.writeUvarint(tagPrevLogNumber)
     619           0 :                 e.writeUvarint(v.ObsoletePrevLogNum)
     620           0 :         }
     621           1 :         if v.NextFileNum != 0 {
     622           1 :                 e.writeUvarint(tagNextFileNumber)
     623           1 :                 e.writeUvarint(uint64(v.NextFileNum))
     624           1 :         }
     625           1 :         for _, dfn := range v.RemovedBackingTables {
     626           1 :                 e.writeUvarint(tagRemovedBackingTable)
     627           1 :                 e.writeUvarint(uint64(dfn))
     628           1 :         }
     629           1 :         for _, fileBacking := range v.CreatedBackingTables {
     630           1 :                 e.writeUvarint(tagCreatedBackingTable)
     631           1 :                 e.writeUvarint(uint64(fileBacking.DiskFileNum))
     632           1 :                 e.writeUvarint(fileBacking.Size)
     633           1 :         }
     634             :         // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
     635             :         // even though its value is zero. We detect this by encoding LastSeqNum when
     636             :         // ComparerName is set.
     637           1 :         if v.LastSeqNum != 0 || v.ComparerName != "" {
     638           1 :                 e.writeUvarint(tagLastSequence)
     639           1 :                 e.writeUvarint(uint64(v.LastSeqNum))
     640           1 :         }
     641           1 :         for x := range v.DeletedFiles {
     642           1 :                 e.writeUvarint(tagDeletedFile)
     643           1 :                 e.writeUvarint(uint64(x.Level))
     644           1 :                 e.writeUvarint(uint64(x.FileNum))
     645           1 :         }
     646           1 :         for _, x := range v.NewFiles {
     647           1 :                 customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
     648           1 :                 var tag uint64
     649           1 :                 switch {
     650           1 :                 case x.Meta.HasRangeKeys:
     651           1 :                         tag = tagNewFile5
     652           1 :                 case customFields:
     653           1 :                         tag = tagNewFile4
     654           0 :                 default:
     655           0 :                         tag = tagNewFile2
     656             :                 }
     657           1 :                 e.writeUvarint(tag)
     658           1 :                 e.writeUvarint(uint64(x.Level))
     659           1 :                 e.writeUvarint(uint64(x.Meta.FileNum))
     660           1 :                 e.writeUvarint(x.Meta.Size)
     661           1 :                 if !x.Meta.HasRangeKeys {
     662           1 :                         // If we have no range keys, preserve the original format and write the
     663           1 :                         // smallest and largest point keys.
     664           1 :                         e.writeKey(x.Meta.SmallestPointKey)
     665           1 :                         e.writeKey(x.Meta.LargestPointKey)
     666           1 :                 } else {
     667           1 :                         // When range keys are present, we first write a marker byte that
     668           1 :                         // indicates if the table also contains point keys, in addition to how the
     669           1 :                         // overall bounds for the table should be reconstructed. This byte is
     670           1 :                         // followed by the keys themselves.
     671           1 :                         b, err := x.Meta.boundsMarker()
     672           1 :                         if err != nil {
     673           0 :                                 return err
     674           0 :                         }
     675           1 :                         if err = e.WriteByte(b); err != nil {
     676           0 :                                 return err
     677           0 :                         }
     678             :                         // Write point key bounds (if present).
     679           1 :                         if x.Meta.HasPointKeys {
     680           1 :                                 e.writeKey(x.Meta.SmallestPointKey)
     681           1 :                                 e.writeKey(x.Meta.LargestPointKey)
     682           1 :                         }
     683             :                         // Write range key bounds.
     684           1 :                         e.writeKey(x.Meta.SmallestRangeKey)
     685           1 :                         e.writeKey(x.Meta.LargestRangeKey)
     686             :                 }
     687           1 :                 e.writeUvarint(uint64(x.Meta.SmallestSeqNum))
     688           1 :                 e.writeUvarint(uint64(x.Meta.LargestSeqNum))
     689           1 :                 if customFields {
     690           1 :                         if x.Meta.CreationTime != 0 {
     691           1 :                                 e.writeUvarint(customTagCreationTime)
     692           1 :                                 var buf [binary.MaxVarintLen64]byte
     693           1 :                                 n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
     694           1 :                                 e.writeBytes(buf[:n])
     695           1 :                         }
     696           1 :                         if x.Meta.MarkedForCompaction {
     697           0 :                                 e.writeUvarint(customTagNeedsCompaction)
     698           0 :                                 e.writeBytes([]byte{1})
     699           0 :                         }
     700           1 :                         if x.Meta.Virtual {
     701           1 :                                 e.writeUvarint(customTagVirtual)
     702           1 :                                 e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum))
     703           1 :                         }
     704           1 :                         if x.Meta.SyntheticPrefix != nil {
     705           1 :                                 e.writeUvarint(customTagSyntheticPrefix)
     706           1 :                                 e.writeBytes(x.Meta.SyntheticPrefix)
     707           1 :                         }
     708           1 :                         if x.Meta.SyntheticSuffix != nil {
     709           1 :                                 e.writeUvarint(customTagSyntheticSuffix)
     710           1 :                                 e.writeBytes(x.Meta.SyntheticSuffix)
     711           1 :                         }
     712           1 :                         e.writeUvarint(customTagTerminate)
     713             :                 }
     714             :         }
     715           1 :         _, err := w.Write(e.Bytes())
     716           1 :         return err
     717             : }
     718             : 
     719             : // versionEditDecoder should be used to decode version edits.
     720             : type versionEditDecoder struct {
     721             :         byteReader
     722             : }
     723             : 
     724           1 : func (d versionEditDecoder) readBytes() ([]byte, error) {
     725           1 :         n, err := d.readUvarint()
     726           1 :         if err != nil {
     727           0 :                 return nil, err
     728           0 :         }
     729           1 :         s := make([]byte, n)
     730           1 :         _, err = io.ReadFull(d, s)
     731           1 :         if err != nil {
     732           0 :                 if err == io.ErrUnexpectedEOF {
     733           0 :                         return nil, errCorruptManifest
     734           0 :                 }
     735           0 :                 return nil, err
     736             :         }
     737           1 :         return s, nil
     738             : }
     739             : 
     740           1 : func (d versionEditDecoder) readLevel() (int, error) {
     741           1 :         u, err := d.readUvarint()
     742           1 :         if err != nil {
     743           0 :                 return 0, err
     744           0 :         }
     745           1 :         if u >= NumLevels {
     746           0 :                 return 0, errCorruptManifest
     747           0 :         }
     748           1 :         return int(u), nil
     749             : }
     750             : 
     751           1 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
     752           1 :         u, err := d.readUvarint()
     753           1 :         if err != nil {
     754           0 :                 return 0, err
     755           0 :         }
     756           1 :         return base.FileNum(u), nil
     757             : }
     758             : 
     759           1 : func (d versionEditDecoder) readUvarint() (uint64, error) {
     760           1 :         u, err := binary.ReadUvarint(d)
     761           1 :         if err != nil {
     762           0 :                 if err == io.EOF {
     763           0 :                         return 0, errCorruptManifest
     764           0 :                 }
     765           0 :                 return 0, err
     766             :         }
     767           1 :         return u, nil
     768             : }
     769             : 
     770             : type versionEditEncoder struct {
     771             :         *bytes.Buffer
     772             : }
     773             : 
     774           1 : func (e versionEditEncoder) writeBytes(p []byte) {
     775           1 :         e.writeUvarint(uint64(len(p)))
     776           1 :         e.Write(p)
     777           1 : }
     778             : 
     779           1 : func (e versionEditEncoder) writeKey(k InternalKey) {
     780           1 :         e.writeUvarint(uint64(k.Size()))
     781           1 :         e.Write(k.UserKey)
     782           1 :         buf := k.EncodeTrailer()
     783           1 :         e.Write(buf[:])
     784           1 : }
     785             : 
     786           1 : func (e versionEditEncoder) writeString(s string) {
     787           1 :         e.writeUvarint(uint64(len(s)))
     788           1 :         e.WriteString(s)
     789           1 : }
     790             : 
     791           1 : func (e versionEditEncoder) writeUvarint(u uint64) {
     792           1 :         var buf [binary.MaxVarintLen64]byte
     793           1 :         n := binary.PutUvarint(buf[:], u)
     794           1 :         e.Write(buf[:n])
     795           1 : }
     796             : 
     797             : // BulkVersionEdit summarizes the files added and deleted from a set of version
     798             : // edits.
     799             : //
     800             : // INVARIANTS:
     801             : // No file can be added to a level more than once. This is true globally, and
     802             : // also true for all of the calls to Accumulate for a single bulk version edit.
     803             : //
     804             : // No file can be removed from a level more than once. This is true globally,
     805             : // and also true for all of the calls to Accumulate for a single bulk version
     806             : // edit.
     807             : //
     808             : // A file must not be added and removed from a given level in the same version
     809             : // edit.
     810             : //
     811             : // A file that is being removed from a level must have been added to that level
     812             : // before (in a prior version edit). Note that a given file can be deleted from
     813             : // a level and added to another level in a single version edit
     814             : type BulkVersionEdit struct {
     815             :         Added   [NumLevels]map[base.FileNum]*FileMetadata
     816             :         Deleted [NumLevels]map[base.FileNum]*FileMetadata
     817             : 
     818             :         // AddedFileBacking is a map to support lookup so that we can populate the
     819             :         // FileBacking of virtual sstables during manifest replay.
     820             :         AddedFileBacking   map[base.DiskFileNum]*FileBacking
     821             :         RemovedFileBacking []base.DiskFileNum
     822             : 
     823             :         // AddedByFileNum maps file number to file metadata for all added files
     824             :         // from accumulated version edits. AddedByFileNum is only populated if set
     825             :         // to non-nil by a caller. It must be set to non-nil when replaying
     826             :         // version edits read from a MANIFEST (as opposed to VersionEdits
     827             :         // constructed in-memory).  While replaying a MANIFEST file,
     828             :         // VersionEdit.DeletedFiles map entries have nil values, because the
     829             :         // on-disk deletion record encodes only the file number. Accumulate
     830             :         // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
     831             :         // field with non-nil *FileMetadata.
     832             :         AddedByFileNum map[base.FileNum]*FileMetadata
     833             : 
     834             :         // MarkedForCompactionCountDiff holds the aggregated count of files
     835             :         // marked for compaction added or removed.
     836             :         MarkedForCompactionCountDiff int
     837             : }
     838             : 
     839             : // Accumulate adds the file addition and deletions in the specified version
     840             : // edit to the bulk edit's internal state.
     841             : //
     842             : // INVARIANTS:
     843             : // If a file is added to a given level in a call to Accumulate and then removed
     844             : // from that level in a subsequent call, the file will not be present in the
     845             : // resulting BulkVersionEdit.Deleted for that level.
     846             : //
     847             : // After accumulation of version edits, the bulk version edit may have
     848             : // information about a file which has been deleted from a level, but it may
     849             : // not have information about the same file added to the same level. The add
     850             : // could've occurred as part of a previous bulk version edit. In this case,
     851             : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
     852             : // of the accumulation, because we need to decrease the refcount of the
     853             : // deleted file in Apply.
     854           1 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
     855           1 :         for df, m := range ve.DeletedFiles {
     856           1 :                 dmap := b.Deleted[df.Level]
     857           1 :                 if dmap == nil {
     858           1 :                         dmap = make(map[base.FileNum]*FileMetadata)
     859           1 :                         b.Deleted[df.Level] = dmap
     860           1 :                 }
     861             : 
     862           1 :                 if m == nil {
     863           1 :                         // m is nil only when replaying a MANIFEST.
     864           1 :                         if b.AddedByFileNum == nil {
     865           0 :                                 return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
     866           0 :                         }
     867           1 :                         m = b.AddedByFileNum[df.FileNum]
     868           1 :                         if m == nil {
     869           0 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
     870           0 :                         }
     871             :                 }
     872           1 :                 if m.MarkedForCompaction {
     873           0 :                         b.MarkedForCompactionCountDiff--
     874           0 :                 }
     875           1 :                 if _, ok := b.Added[df.Level][df.FileNum]; !ok {
     876           1 :                         dmap[df.FileNum] = m
     877           1 :                 } else {
     878           1 :                         // Present in b.Added for the same level.
     879           1 :                         delete(b.Added[df.Level], df.FileNum)
     880           1 :                 }
     881             :         }
     882             : 
     883             :         // Generate state for Added backing files. Note that these must be generated
     884             :         // before we loop through the NewFiles, because we need to populate the
     885             :         // FileBackings which might be used by the NewFiles loop.
     886           1 :         if b.AddedFileBacking == nil {
     887           1 :                 b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
     888           1 :         }
     889           1 :         for _, fb := range ve.CreatedBackingTables {
     890           1 :                 if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
     891           0 :                         // There is already a FileBacking associated with fb.DiskFileNum.
     892           0 :                         // This should never happen. There must always be only one FileBacking
     893           0 :                         // associated with a backing sstable.
     894           0 :                         panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
     895             :                 }
     896           1 :                 b.AddedFileBacking[fb.DiskFileNum] = fb
     897             :         }
     898             : 
     899           1 :         for _, nf := range ve.NewFiles {
     900           1 :                 // A new file should not have been deleted in this or a preceding
     901           1 :                 // VersionEdit at the same level (though files can move across levels).
     902           1 :                 if dmap := b.Deleted[nf.Level]; dmap != nil {
     903           1 :                         if _, ok := dmap[nf.Meta.FileNum]; ok {
     904           0 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
     905           0 :                         }
     906             :                 }
     907           1 :                 if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
     908           1 :                         // FileBacking for a virtual sstable must only be nil if we're performing
     909           1 :                         // manifest replay.
     910           1 :                         nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
     911           1 :                         if nf.Meta.FileBacking == nil {
     912           0 :                                 return errors.Errorf("FileBacking for virtual sstable must not be nil")
     913           0 :                         }
     914           1 :                 } else if nf.Meta.FileBacking == nil {
     915           0 :                         return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
     916           0 :                 }
     917             : 
     918           1 :                 if b.Added[nf.Level] == nil {
     919           1 :                         b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
     920           1 :                 }
     921           1 :                 b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
     922           1 :                 if b.AddedByFileNum != nil {
     923           1 :                         b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
     924           1 :                 }
     925           1 :                 if nf.Meta.MarkedForCompaction {
     926           0 :                         b.MarkedForCompactionCountDiff++
     927           0 :                 }
     928             :         }
     929             : 
     930           1 :         for _, n := range ve.RemovedBackingTables {
     931           1 :                 if _, ok := b.AddedFileBacking[n]; ok {
     932           1 :                         delete(b.AddedFileBacking, n)
     933           1 :                 } else {
     934           1 :                         // Since a file can be removed from backing files in exactly one version
     935           1 :                         // edit it is safe to just append without any de-duplication.
     936           1 :                         b.RemovedFileBacking = append(b.RemovedFileBacking, n)
     937           1 :                 }
     938             :         }
     939             : 
     940           1 :         return nil
     941             : }
     942             : 
     943             : // Apply applies the delta b to the current version to produce a new version.
     944             : // The new version is consistent with respect to the comparer.
     945             : //
     946             : // Apply updates the backing refcounts (Ref/Unref) as files are installed into
     947             : // the levels.
     948             : //
     949             : // curr may be nil, which is equivalent to a pointer to a zero version.
     950             : func (b *BulkVersionEdit) Apply(
     951             :         curr *Version, comparer *base.Comparer, flushSplitBytes int64, readCompactionRate int64,
     952           1 : ) (*Version, error) {
     953           1 :         v := &Version{
     954           1 :                 cmp: comparer,
     955           1 :         }
     956           1 : 
     957           1 :         // Adjust the count of files marked for compaction.
     958           1 :         if curr != nil {
     959           1 :                 v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
     960           1 :         }
     961           1 :         v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
     962           1 :         if v.Stats.MarkedForCompaction < 0 {
     963           0 :                 return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
     964           0 :         }
     965             : 
     966           1 :         for level := range v.Levels {
     967           1 :                 if curr == nil || curr.Levels[level].tree.root == nil {
     968           1 :                         v.Levels[level] = MakeLevelMetadata(comparer.Compare, level, nil /* files */)
     969           1 :                 } else {
     970           1 :                         v.Levels[level] = curr.Levels[level].clone()
     971           1 :                 }
     972           1 :                 if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
     973           1 :                         v.RangeKeyLevels[level] = MakeLevelMetadata(comparer.Compare, level, nil /* files */)
     974           1 :                 } else {
     975           1 :                         v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
     976           1 :                 }
     977             : 
     978           1 :                 if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
     979           1 :                         // There are no edits on this level.
     980           1 :                         if level == 0 {
     981           1 :                                 // Initialize L0Sublevels.
     982           1 :                                 if curr == nil || curr.L0Sublevels == nil {
     983           1 :                                         if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
     984           0 :                                                 return nil, errors.Wrap(err, "pebble: internal error")
     985           0 :                                         }
     986           1 :                                 } else {
     987           1 :                                         v.L0Sublevels = curr.L0Sublevels
     988           1 :                                         v.L0SublevelFiles = v.L0Sublevels.Levels
     989           1 :                                 }
     990             :                         }
     991           1 :                         continue
     992             :                 }
     993             : 
     994             :                 // Some edits on this level.
     995           1 :                 lm := &v.Levels[level]
     996           1 :                 lmRange := &v.RangeKeyLevels[level]
     997           1 : 
     998           1 :                 addedFilesMap := b.Added[level]
     999           1 :                 deletedFilesMap := b.Deleted[level]
    1000           1 :                 if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
    1001           0 :                         return nil, base.CorruptionErrorf(
    1002           0 :                                 "pebble: internal error: No current or added files but have deleted files: %d",
    1003           0 :                                 errors.Safe(len(deletedFilesMap)))
    1004           0 :                 }
    1005             : 
    1006             :                 // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
    1007             :                 // for a level, it won't be present in deletedFilesMap for the same
    1008             :                 // level.
    1009             : 
    1010           1 :                 for _, f := range deletedFilesMap {
    1011           1 :                         if obsolete := v.Levels[level].remove(f); obsolete {
    1012           0 :                                 // Deleting a file from the B-Tree may decrement its
    1013           0 :                                 // reference count. However, because we cloned the
    1014           0 :                                 // previous level's B-Tree, this should never result in a
    1015           0 :                                 // file's reference count dropping to zero.
    1016           0 :                                 err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
    1017           0 :                                 return nil, err
    1018           0 :                         }
    1019           1 :                         if f.HasRangeKeys {
    1020           1 :                                 if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
    1021           0 :                                         // Deleting a file from the B-Tree may decrement its
    1022           0 :                                         // reference count. However, because we cloned the
    1023           0 :                                         // previous level's B-Tree, this should never result in a
    1024           0 :                                         // file's reference count dropping to zero.
    1025           0 :                                         err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
    1026           0 :                                         return nil, err
    1027           0 :                                 }
    1028             :                         }
    1029             :                 }
    1030             : 
    1031           1 :                 addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
    1032           1 :                 for _, f := range addedFilesMap {
    1033           1 :                         addedFiles = append(addedFiles, f)
    1034           1 :                 }
    1035             :                 // Sort addedFiles by file number. This isn't necessary, but tests which
    1036             :                 // replay invalid manifests check the error output, and the error output
    1037             :                 // depends on the order in which files are added to the btree.
    1038           1 :                 slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
    1039           1 :                         return stdcmp.Compare(a.FileNum, b.FileNum)
    1040           1 :                 })
    1041             : 
    1042           1 :                 var sm, la *FileMetadata
    1043           1 :                 for _, f := range addedFiles {
    1044           1 :                         // NB: allowedSeeks is used for read triggered compactions. It is set using
    1045           1 :                         // Options.Experimental.ReadCompactionRate which defaults to 32KB.
    1046           1 :                         var allowedSeeks int64
    1047           1 :                         if readCompactionRate != 0 {
    1048           1 :                                 allowedSeeks = int64(f.Size) / readCompactionRate
    1049           1 :                         }
    1050           1 :                         if allowedSeeks < 100 {
    1051           1 :                                 allowedSeeks = 100
    1052           1 :                         }
    1053           1 :                         f.AllowedSeeks.Store(allowedSeeks)
    1054           1 :                         f.InitAllowedSeeks = allowedSeeks
    1055           1 : 
    1056           1 :                         err := lm.insert(f)
    1057           1 :                         if err != nil {
    1058           0 :                                 return nil, errors.Wrap(err, "pebble")
    1059           0 :                         }
    1060           1 :                         if f.HasRangeKeys {
    1061           1 :                                 err = lmRange.insert(f)
    1062           1 :                                 if err != nil {
    1063           0 :                                         return nil, errors.Wrap(err, "pebble")
    1064           0 :                                 }
    1065             :                         }
    1066             :                         // Track the keys with the smallest and largest keys, so that we can
    1067             :                         // check consistency of the modified span.
    1068           1 :                         if sm == nil || base.InternalCompare(comparer.Compare, sm.Smallest, f.Smallest) > 0 {
    1069           1 :                                 sm = f
    1070           1 :                         }
    1071           1 :                         if la == nil || base.InternalCompare(comparer.Compare, la.Largest, f.Largest) < 0 {
    1072           1 :                                 la = f
    1073           1 :                         }
    1074             :                 }
    1075             : 
    1076           1 :                 if level == 0 {
    1077           1 :                         if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
    1078           1 :                                 // Flushes and ingestions that do not delete any L0 files do not require
    1079           1 :                                 // a regeneration of L0Sublevels from scratch. We can instead generate
    1080           1 :                                 // it incrementally.
    1081           1 :                                 var err error
    1082           1 :                                 // AddL0Files requires addedFiles to be sorted in seqnum order.
    1083           1 :                                 SortBySeqNum(addedFiles)
    1084           1 :                                 v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
    1085           1 :                                 if errors.Is(err, errInvalidL0SublevelsOpt) {
    1086           1 :                                         err = v.InitL0Sublevels(flushSplitBytes)
    1087           1 :                                 } else if invariants.Enabled && err == nil {
    1088           1 :                                         copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], comparer.Compare, comparer.FormatKey, flushSplitBytes)
    1089           1 :                                         if err != nil {
    1090           0 :                                                 panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
    1091             :                                         }
    1092           1 :                                         s1 := describeSublevels(comparer.FormatKey, false /* verbose */, copyOfSublevels.Levels)
    1093           1 :                                         s2 := describeSublevels(comparer.FormatKey, false /* verbose */, v.L0Sublevels.Levels)
    1094           1 :                                         if s1 != s2 {
    1095           0 :                                                 // Add verbosity.
    1096           0 :                                                 s1 := describeSublevels(comparer.FormatKey, true /* verbose */, copyOfSublevels.Levels)
    1097           0 :                                                 s2 := describeSublevels(comparer.FormatKey, true /* verbose */, v.L0Sublevels.Levels)
    1098           0 :                                                 panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
    1099             :                                         }
    1100             :                                 }
    1101           1 :                                 if err != nil {
    1102           0 :                                         return nil, errors.Wrap(err, "pebble: internal error")
    1103           0 :                                 }
    1104           1 :                                 v.L0SublevelFiles = v.L0Sublevels.Levels
    1105           1 :                         } else if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
    1106           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1107           0 :                         }
    1108           1 :                         if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(0), v.Levels[level].Iter()); err != nil {
    1109           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1110           0 :                         }
    1111           1 :                         continue
    1112             :                 }
    1113             : 
    1114             :                 // Check consistency of the level in the vicinity of our edits.
    1115           1 :                 if sm != nil && la != nil {
    1116           1 :                         overlap := v.Levels[level].Slice().Overlaps(comparer.Compare, sm.UserKeyBounds())
    1117           1 :                         // overlap contains all of the added files. We want to ensure that
    1118           1 :                         // the added files are consistent with neighboring existing files
    1119           1 :                         // too, so reslice the overlap to pull in a neighbor on each side.
    1120           1 :                         check := overlap.Reslice(func(start, end *LevelIterator) {
    1121           1 :                                 if m := start.Prev(); m == nil {
    1122           1 :                                         start.Next()
    1123           1 :                                 }
    1124           1 :                                 if m := end.Next(); m == nil {
    1125           1 :                                         end.Prev()
    1126           1 :                                 }
    1127             :                         })
    1128           1 :                         if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(level), check.Iter()); err != nil {
    1129           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1130           0 :                         }
    1131             :                 }
    1132             :         }
    1133           1 :         return v, nil
    1134             : }

Generated by: LCOV version 1.14