LCOV - code coverage report
Current view: top level - pebble/internal/manifest - version_edit.go (source / functions) Hit Total Coverage
Test: 2024-03-06 08:16Z f3e3d4aa - tests only.lcov Lines: 578 761 76.0 %
Date: 2024-03-06 08:16:53 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package manifest
       6             : 
       7             : import (
       8             :         "bufio"
       9             :         "bytes"
      10             :         stdcmp "cmp"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "slices"
      15             :         "time"
      16             : 
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/invariants"
      20             :         "github.com/cockroachdb/pebble/sstable"
      21             : )
      22             : 
      23             : // TODO(peter): describe the MANIFEST file format, independently of the C++
      24             : // project.
      25             : 
      26             : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
      27             : 
      28             : type byteReader interface {
      29             :         io.ByteReader
      30             :         io.Reader
      31             : }
      32             : 
      33             : // Tags for the versionEdit disk format.
      34             : // Tag 8 is no longer used.
      35             : const (
      36             :         // LevelDB tags.
      37             :         tagComparator     = 1
      38             :         tagLogNumber      = 2
      39             :         tagNextFileNumber = 3
      40             :         tagLastSequence   = 4
      41             :         tagCompactPointer = 5
      42             :         tagDeletedFile    = 6
      43             :         tagNewFile        = 7
      44             :         tagPrevLogNumber  = 9
      45             : 
      46             :         // RocksDB tags.
      47             :         tagNewFile2         = 100
      48             :         tagNewFile3         = 102
      49             :         tagNewFile4         = 103
      50             :         tagColumnFamily     = 200
      51             :         tagColumnFamilyAdd  = 201
      52             :         tagColumnFamilyDrop = 202
      53             :         tagMaxColumnFamily  = 203
      54             : 
      55             :         // Pebble tags.
      56             :         tagNewFile5            = 104 // Range keys.
      57             :         tagCreatedBackingTable = 105
      58             :         tagRemovedBackingTable = 106
      59             : 
      60             :         // The custom tags sub-format used by tagNewFile4 and above. All tags less
      61             :         // than customTagNonSafeIgnoreMask are safe to ignore and their format must be
      62             :         // a single bytes field.
      63             :         customTagTerminate         = 1
      64             :         customTagNeedsCompaction   = 2
      65             :         customTagCreationTime      = 6
      66             :         customTagPathID            = 65
      67             :         customTagNonSafeIgnoreMask = 1 << 6
      68             :         customTagVirtual           = 66
      69             :         customTagPrefixRewrite     = 67
      70             :         customTagSuffixRewrite     = 68
      71             : )
      72             : 
      73             : // DeletedFileEntry holds the state for a file deletion from a level. The file
      74             : // itself might still be referenced by another level.
      75             : type DeletedFileEntry struct {
      76             :         Level   int
      77             :         FileNum base.FileNum
      78             : }
      79             : 
      80             : // NewFileEntry holds the state for a new file or one moved from a different
      81             : // level.
      82             : type NewFileEntry struct {
      83             :         Level int
      84             :         Meta  *FileMetadata
      85             :         // BackingFileNum is only set during manifest replay, and only for virtual
      86             :         // sstables.
      87             :         BackingFileNum base.DiskFileNum
      88             : }
      89             : 
      90             : // VersionEdit holds the state for an edit to a Version along with other
      91             : // on-disk state (log numbers, next file number, and the last sequence number).
      92             : type VersionEdit struct {
      93             :         // ComparerName is the value of Options.Comparer.Name. This is only set in
      94             :         // the first VersionEdit in a manifest (either when the DB is created, or
      95             :         // when a new manifest is created) and is used to verify that the comparer
      96             :         // specified at Open matches the comparer that was previously used.
      97             :         ComparerName string
      98             : 
      99             :         // MinUnflushedLogNum is the smallest WAL log file number corresponding to
     100             :         // mutations that have not been flushed to an sstable.
     101             :         //
     102             :         // This is an optional field, and 0 represents it is not set.
     103             :         MinUnflushedLogNum base.DiskFileNum
     104             : 
     105             :         // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
     106             :         // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
     107             :         // 6/2011. We keep it around purely for informational purposes when
     108             :         // displaying MANIFEST contents.
     109             :         ObsoletePrevLogNum uint64
     110             : 
     111             :         // The next file number. A single counter is used to assign file numbers
     112             :         // for the WAL, MANIFEST, sstable, and OPTIONS files.
     113             :         NextFileNum uint64
     114             : 
     115             :         // LastSeqNum is an upper bound on the sequence numbers that have been
     116             :         // assigned in flushed WALs. Unflushed WALs (that will be replayed during
     117             :         // recovery) may contain sequence numbers greater than this value.
     118             :         LastSeqNum uint64
     119             : 
     120             :         // A file num may be present in both deleted files and new files when it
     121             :         // is moved from a lower level to a higher level (when the compaction
     122             :         // found that there was no overlapping file at the higher level).
     123             :         DeletedFiles map[DeletedFileEntry]*FileMetadata
     124             :         NewFiles     []NewFileEntry
     125             :         // CreatedBackingTables can be used to preserve the FileBacking associated
     126             :         // with a physical sstable. This is useful when virtual sstables in the
     127             :         // latest version are reconstructed during manifest replay, and we also need
     128             :         // to reconstruct the FileBacking which is required by these virtual
     129             :         // sstables.
     130             :         //
     131             :         // INVARIANT: The FileBacking associated with a physical sstable must only
     132             :         // be added as a backing file in the same version edit where the physical
     133             :         // sstable is first virtualized. This means that the physical sstable must
     134             :         // be present in DeletedFiles and that there must be at least one virtual
     135             :         // sstable with the same FileBacking as the physical sstable in NewFiles. A
     136             :         // file must be present in CreatedBackingTables in exactly one version edit.
     137             :         // The physical sstable associated with the FileBacking must also not be
     138             :         // present in NewFiles.
     139             :         CreatedBackingTables []*FileBacking
     140             :         // RemovedBackingTables is used to remove the FileBacking associated with a
     141             :         // virtual sstable. Note that a backing sstable can be removed as soon as
     142             :         // there are no virtual sstables in the latest version which are using the
     143             :         // backing sstable, but the backing sstable doesn't necessarily have to be
     144             :         // removed atomically with the version edit which removes the last virtual
     145             :         // sstable associated with the backing sstable. The removal can happen in a
     146             :         // future version edit.
     147             :         //
     148             :         // INVARIANT: A file must only be added to RemovedBackingTables if it was
     149             :         // added to CreateBackingTables in a prior version edit. The same version
     150             :         // edit also cannot have the same file present in both CreateBackingTables
     151             :         // and RemovedBackingTables. A file must be present in RemovedBackingTables
     152             :         // in exactly one version edit.
     153             :         RemovedBackingTables []base.DiskFileNum
     154             : }
     155             : 
     156             : // Decode decodes an edit from the specified reader.
     157             : //
     158             : // Note that the Decode step will not set the FileBacking for virtual sstables
     159             : // and the responsibility is left to the caller. However, the Decode step will
     160             : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
     161           1 : func (v *VersionEdit) Decode(r io.Reader) error {
     162           1 :         br, ok := r.(byteReader)
     163           1 :         if !ok {
     164           1 :                 br = bufio.NewReader(r)
     165           1 :         }
     166           1 :         d := versionEditDecoder{br}
     167           1 :         for {
     168           1 :                 tag, err := binary.ReadUvarint(br)
     169           1 :                 if err == io.EOF {
     170           1 :                         break
     171             :                 }
     172           1 :                 if err != nil {
     173           0 :                         return err
     174           0 :                 }
     175           1 :                 switch tag {
     176           1 :                 case tagComparator:
     177           1 :                         s, err := d.readBytes()
     178           1 :                         if err != nil {
     179           0 :                                 return err
     180           0 :                         }
     181           1 :                         v.ComparerName = string(s)
     182             : 
     183           1 :                 case tagLogNumber:
     184           1 :                         n, err := d.readUvarint()
     185           1 :                         if err != nil {
     186           0 :                                 return err
     187           0 :                         }
     188           1 :                         v.MinUnflushedLogNum = base.DiskFileNum(n)
     189             : 
     190           1 :                 case tagNextFileNumber:
     191           1 :                         n, err := d.readUvarint()
     192           1 :                         if err != nil {
     193           0 :                                 return err
     194           0 :                         }
     195           1 :                         v.NextFileNum = n
     196             : 
     197           1 :                 case tagLastSequence:
     198           1 :                         n, err := d.readUvarint()
     199           1 :                         if err != nil {
     200           0 :                                 return err
     201           0 :                         }
     202           1 :                         v.LastSeqNum = n
     203             : 
     204           0 :                 case tagCompactPointer:
     205           0 :                         if _, err := d.readLevel(); err != nil {
     206           0 :                                 return err
     207           0 :                         }
     208           0 :                         if _, err := d.readBytes(); err != nil {
     209           0 :                                 return err
     210           0 :                         }
     211             :                         // NB: RocksDB does not use compaction pointers anymore.
     212             : 
     213           1 :                 case tagRemovedBackingTable:
     214           1 :                         n, err := d.readUvarint()
     215           1 :                         if err != nil {
     216           0 :                                 return err
     217           0 :                         }
     218           1 :                         v.RemovedBackingTables = append(
     219           1 :                                 v.RemovedBackingTables, base.DiskFileNum(n),
     220           1 :                         )
     221           1 :                 case tagCreatedBackingTable:
     222           1 :                         dfn, err := d.readUvarint()
     223           1 :                         if err != nil {
     224           0 :                                 return err
     225           0 :                         }
     226           1 :                         size, err := d.readUvarint()
     227           1 :                         if err != nil {
     228           0 :                                 return err
     229           0 :                         }
     230           1 :                         fileBacking := &FileBacking{
     231           1 :                                 DiskFileNum: base.DiskFileNum(dfn),
     232           1 :                                 Size:        size,
     233           1 :                         }
     234           1 :                         v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
     235           1 :                 case tagDeletedFile:
     236           1 :                         level, err := d.readLevel()
     237           1 :                         if err != nil {
     238           0 :                                 return err
     239           0 :                         }
     240           1 :                         fileNum, err := d.readFileNum()
     241           1 :                         if err != nil {
     242           0 :                                 return err
     243           0 :                         }
     244           1 :                         if v.DeletedFiles == nil {
     245           1 :                                 v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
     246           1 :                         }
     247           1 :                         v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
     248             : 
     249           1 :                 case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
     250           1 :                         level, err := d.readLevel()
     251           1 :                         if err != nil {
     252           0 :                                 return err
     253           0 :                         }
     254           1 :                         fileNum, err := d.readFileNum()
     255           1 :                         if err != nil {
     256           0 :                                 return err
     257           0 :                         }
     258           1 :                         if tag == tagNewFile3 {
     259           0 :                                 // The pathID field appears unused in RocksDB.
     260           0 :                                 _ /* pathID */, err := d.readUvarint()
     261           0 :                                 if err != nil {
     262           0 :                                         return err
     263           0 :                                 }
     264             :                         }
     265           1 :                         size, err := d.readUvarint()
     266           1 :                         if err != nil {
     267           0 :                                 return err
     268           0 :                         }
     269             :                         // We read the smallest / largest key bounds differently depending on
     270             :                         // whether we have point, range or both types of keys present in the
     271             :                         // table.
     272           1 :                         var (
     273           1 :                                 smallestPointKey, largestPointKey []byte
     274           1 :                                 smallestRangeKey, largestRangeKey []byte
     275           1 :                                 parsedPointBounds                 bool
     276           1 :                                 boundsMarker                      byte
     277           1 :                         )
     278           1 :                         if tag != tagNewFile5 {
     279           1 :                                 // Range keys not present in the table. Parse the point key bounds.
     280           1 :                                 smallestPointKey, err = d.readBytes()
     281           1 :                                 if err != nil {
     282           0 :                                         return err
     283           0 :                                 }
     284           1 :                                 largestPointKey, err = d.readBytes()
     285           1 :                                 if err != nil {
     286           0 :                                         return err
     287           0 :                                 }
     288           1 :                         } else {
     289           1 :                                 // Range keys are present in the table. Determine whether we have point
     290           1 :                                 // keys to parse, in addition to the bounds.
     291           1 :                                 boundsMarker, err = d.ReadByte()
     292           1 :                                 if err != nil {
     293           0 :                                         return err
     294           0 :                                 }
     295             :                                 // Parse point key bounds, if present.
     296           1 :                                 if boundsMarker&maskContainsPointKeys > 0 {
     297           1 :                                         smallestPointKey, err = d.readBytes()
     298           1 :                                         if err != nil {
     299           0 :                                                 return err
     300           0 :                                         }
     301           1 :                                         largestPointKey, err = d.readBytes()
     302           1 :                                         if err != nil {
     303           0 :                                                 return err
     304           0 :                                         }
     305           1 :                                         parsedPointBounds = true
     306           1 :                                 } else {
     307           1 :                                         // The table does not have point keys.
     308           1 :                                         // Sanity check: the bounds must be range keys.
     309           1 :                                         if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
     310           0 :                                                 return base.CorruptionErrorf(
     311           0 :                                                         "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
     312           0 :                                                         boundsMarker,
     313           0 :                                                 )
     314           0 :                                         }
     315             :                                 }
     316             :                                 // Parse range key bounds.
     317           1 :                                 smallestRangeKey, err = d.readBytes()
     318           1 :                                 if err != nil {
     319           0 :                                         return err
     320           0 :                                 }
     321           1 :                                 largestRangeKey, err = d.readBytes()
     322           1 :                                 if err != nil {
     323           0 :                                         return err
     324           0 :                                 }
     325             :                         }
     326           1 :                         var smallestSeqNum uint64
     327           1 :                         var largestSeqNum uint64
     328           1 :                         if tag != tagNewFile {
     329           1 :                                 smallestSeqNum, err = d.readUvarint()
     330           1 :                                 if err != nil {
     331           0 :                                         return err
     332           0 :                                 }
     333           1 :                                 largestSeqNum, err = d.readUvarint()
     334           1 :                                 if err != nil {
     335           0 :                                         return err
     336           0 :                                 }
     337             :                         }
     338           1 :                         var markedForCompaction bool
     339           1 :                         var creationTime uint64
     340           1 :                         virtualState := struct {
     341           1 :                                 virtual        bool
     342           1 :                                 backingFileNum uint64
     343           1 :                         }{}
     344           1 :                         var virtualPrefix *sstable.PrefixReplacement
     345           1 :                         var syntheticSuffix []byte
     346           1 :                         if tag == tagNewFile4 || tag == tagNewFile5 {
     347           1 :                                 for {
     348           1 :                                         customTag, err := d.readUvarint()
     349           1 :                                         if err != nil {
     350           0 :                                                 return err
     351           0 :                                         }
     352           1 :                                         if customTag == customTagTerminate {
     353           1 :                                                 break
     354             :                                         }
     355           1 :                                         switch customTag {
     356           1 :                                         case customTagNeedsCompaction:
     357           1 :                                                 field, err := d.readBytes()
     358           1 :                                                 if err != nil {
     359           0 :                                                         return err
     360           0 :                                                 }
     361           1 :                                                 if len(field) != 1 {
     362           0 :                                                         return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
     363           0 :                                                 }
     364           1 :                                                 markedForCompaction = (field[0] == 1)
     365             : 
     366           1 :                                         case customTagCreationTime:
     367           1 :                                                 field, err := d.readBytes()
     368           1 :                                                 if err != nil {
     369           0 :                                                         return err
     370           0 :                                                 }
     371           1 :                                                 var n int
     372           1 :                                                 creationTime, n = binary.Uvarint(field)
     373           1 :                                                 if n != len(field) {
     374           0 :                                                         return base.CorruptionErrorf("new-file4: invalid file creation time")
     375           0 :                                                 }
     376             : 
     377           0 :                                         case customTagPathID:
     378           0 :                                                 return base.CorruptionErrorf("new-file4: path-id field not supported")
     379             : 
     380           1 :                                         case customTagVirtual:
     381           1 :                                                 virtualState.virtual = true
     382           1 :                                                 if virtualState.backingFileNum, err = d.readUvarint(); err != nil {
     383           0 :                                                         return err
     384           0 :                                                 }
     385             : 
     386           0 :                                         case customTagPrefixRewrite:
     387           0 :                                                 content, err := d.readBytes()
     388           0 :                                                 if err != nil {
     389           0 :                                                         return err
     390           0 :                                                 }
     391           0 :                                                 synthetic, err := d.readBytes()
     392           0 :                                                 if err != nil {
     393           0 :                                                         return err
     394           0 :                                                 }
     395           0 :                                                 virtualPrefix = &sstable.PrefixReplacement{
     396           0 :                                                         ContentPrefix:   content,
     397           0 :                                                         SyntheticPrefix: synthetic,
     398           0 :                                                 }
     399             : 
     400           1 :                                         case customTagSuffixRewrite:
     401           1 :                                                 if syntheticSuffix, err = d.readBytes(); err != nil {
     402           0 :                                                         return err
     403           0 :                                                 }
     404             : 
     405           0 :                                         default:
     406           0 :                                                 if (customTag & customTagNonSafeIgnoreMask) != 0 {
     407           0 :                                                         return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
     408           0 :                                                 }
     409           0 :                                                 if _, err := d.readBytes(); err != nil {
     410           0 :                                                         return err
     411           0 :                                                 }
     412             :                                         }
     413             :                                 }
     414             :                         }
     415           1 :                         m := &FileMetadata{
     416           1 :                                 FileNum:             fileNum,
     417           1 :                                 Size:                size,
     418           1 :                                 CreationTime:        int64(creationTime),
     419           1 :                                 SmallestSeqNum:      smallestSeqNum,
     420           1 :                                 LargestSeqNum:       largestSeqNum,
     421           1 :                                 MarkedForCompaction: markedForCompaction,
     422           1 :                                 Virtual:             virtualState.virtual,
     423           1 :                                 PrefixReplacement:   virtualPrefix,
     424           1 :                                 SyntheticSuffix:     syntheticSuffix,
     425           1 :                         }
     426           1 :                         if tag != tagNewFile5 { // no range keys present
     427           1 :                                 m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     428           1 :                                 m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     429           1 :                                 m.HasPointKeys = true
     430           1 :                                 m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
     431           1 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
     432           1 :                         } else { // range keys present
     433           1 :                                 // Set point key bounds, if parsed.
     434           1 :                                 if parsedPointBounds {
     435           1 :                                         m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
     436           1 :                                         m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
     437           1 :                                         m.HasPointKeys = true
     438           1 :                                 }
     439             :                                 // Set range key bounds.
     440           1 :                                 m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
     441           1 :                                 m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
     442           1 :                                 m.HasRangeKeys = true
     443           1 :                                 // Set overall bounds (by default assume range keys).
     444           1 :                                 m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
     445           1 :                                 m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
     446           1 :                                 if boundsMarker&maskSmallest == maskSmallest {
     447           1 :                                         m.Smallest = m.SmallestPointKey
     448           1 :                                         m.boundTypeSmallest = boundTypePointKey
     449           1 :                                 }
     450           1 :                                 if boundsMarker&maskLargest == maskLargest {
     451           1 :                                         m.Largest = m.LargestPointKey
     452           1 :                                         m.boundTypeLargest = boundTypePointKey
     453           1 :                                 }
     454             :                         }
     455           1 :                         m.boundsSet = true
     456           1 :                         if !virtualState.virtual {
     457           1 :                                 m.InitPhysicalBacking()
     458           1 :                         }
     459             : 
     460           1 :                         nfe := NewFileEntry{
     461           1 :                                 Level: level,
     462           1 :                                 Meta:  m,
     463           1 :                         }
     464           1 :                         if virtualState.virtual {
     465           1 :                                 nfe.BackingFileNum = base.DiskFileNum(virtualState.backingFileNum)
     466           1 :                         }
     467           1 :                         v.NewFiles = append(v.NewFiles, nfe)
     468             : 
     469           1 :                 case tagPrevLogNumber:
     470           1 :                         n, err := d.readUvarint()
     471           1 :                         if err != nil {
     472           0 :                                 return err
     473           0 :                         }
     474           1 :                         v.ObsoletePrevLogNum = n
     475             : 
     476           0 :                 case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
     477           0 :                         return base.CorruptionErrorf("column families are not supported")
     478             : 
     479           0 :                 default:
     480           0 :                         return errCorruptManifest
     481             :                 }
     482             :         }
     483           1 :         return nil
     484             : }
     485             : 
     486           1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
     487           1 :         var buf bytes.Buffer
     488           1 :         if v.ComparerName != "" {
     489           1 :                 fmt.Fprintf(&buf, "  comparer:     %s", v.ComparerName)
     490           1 :         }
     491           1 :         if v.MinUnflushedLogNum != 0 {
     492           1 :                 fmt.Fprintf(&buf, "  log-num:       %d\n", v.MinUnflushedLogNum)
     493           1 :         }
     494           1 :         if v.ObsoletePrevLogNum != 0 {
     495           0 :                 fmt.Fprintf(&buf, "  prev-log-num:  %d\n", v.ObsoletePrevLogNum)
     496           0 :         }
     497           1 :         if v.NextFileNum != 0 {
     498           1 :                 fmt.Fprintf(&buf, "  next-file-num: %d\n", v.NextFileNum)
     499           1 :         }
     500           1 :         if v.LastSeqNum != 0 {
     501           1 :                 fmt.Fprintf(&buf, "  last-seq-num:  %d\n", v.LastSeqNum)
     502           1 :         }
     503           1 :         entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
     504           1 :         for df := range v.DeletedFiles {
     505           1 :                 entries = append(entries, df)
     506           1 :         }
     507           1 :         slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
     508           1 :                 if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
     509           1 :                         return v
     510           1 :                 }
     511           0 :                 return stdcmp.Compare(a.FileNum, b.FileNum)
     512             :         })
     513           1 :         for _, df := range entries {
     514           1 :                 fmt.Fprintf(&buf, "  deleted:       L%d %s\n", df.Level, df.FileNum)
     515           1 :         }
     516           1 :         for _, nf := range v.NewFiles {
     517           1 :                 fmt.Fprintf(&buf, "  added:         L%d", nf.Level)
     518           1 :                 if verbose {
     519           1 :                         fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, true /* verbose */))
     520           1 :                 } else {
     521           1 :                         fmt.Fprintf(&buf, " %s", nf.Meta.String())
     522           1 :                 }
     523           1 :                 if nf.Meta.CreationTime != 0 {
     524           1 :                         fmt.Fprintf(&buf, " (%s)",
     525           1 :                                 time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
     526           1 :                 }
     527           1 :                 fmt.Fprintln(&buf)
     528             :         }
     529           1 :         return buf.String()
     530             : }
     531             : 
     532             : // DebugString is a more verbose version of String(). Use this in tests.
     533           1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
     534           1 :         return v.string(true /* verbose */, fmtKey)
     535           1 : }
     536             : 
     537             : // String implements fmt.Stringer for a VersionEdit.
     538           1 : func (v *VersionEdit) String() string {
     539           1 :         return v.string(false /* verbose */, base.DefaultFormatter)
     540           1 : }
     541             : 
     542             : // Encode encodes an edit to the specified writer.
     543           1 : func (v *VersionEdit) Encode(w io.Writer) error {
     544           1 :         e := versionEditEncoder{new(bytes.Buffer)}
     545           1 : 
     546           1 :         if v.ComparerName != "" {
     547           1 :                 e.writeUvarint(tagComparator)
     548           1 :                 e.writeString(v.ComparerName)
     549           1 :         }
     550           1 :         if v.MinUnflushedLogNum != 0 {
     551           1 :                 e.writeUvarint(tagLogNumber)
     552           1 :                 e.writeUvarint(uint64(v.MinUnflushedLogNum))
     553           1 :         }
     554           1 :         if v.ObsoletePrevLogNum != 0 {
     555           1 :                 e.writeUvarint(tagPrevLogNumber)
     556           1 :                 e.writeUvarint(v.ObsoletePrevLogNum)
     557           1 :         }
     558           1 :         if v.NextFileNum != 0 {
     559           1 :                 e.writeUvarint(tagNextFileNumber)
     560           1 :                 e.writeUvarint(uint64(v.NextFileNum))
     561           1 :         }
     562           1 :         for _, dfn := range v.RemovedBackingTables {
     563           1 :                 e.writeUvarint(tagRemovedBackingTable)
     564           1 :                 e.writeUvarint(uint64(dfn))
     565           1 :         }
     566           1 :         for _, fileBacking := range v.CreatedBackingTables {
     567           1 :                 e.writeUvarint(tagCreatedBackingTable)
     568           1 :                 e.writeUvarint(uint64(fileBacking.DiskFileNum))
     569           1 :                 e.writeUvarint(fileBacking.Size)
     570           1 :         }
     571             :         // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
     572             :         // even though its value is zero. We detect this by encoding LastSeqNum when
     573             :         // ComparerName is set.
     574           1 :         if v.LastSeqNum != 0 || v.ComparerName != "" {
     575           1 :                 e.writeUvarint(tagLastSequence)
     576           1 :                 e.writeUvarint(v.LastSeqNum)
     577           1 :         }
     578           1 :         for x := range v.DeletedFiles {
     579           1 :                 e.writeUvarint(tagDeletedFile)
     580           1 :                 e.writeUvarint(uint64(x.Level))
     581           1 :                 e.writeUvarint(uint64(x.FileNum))
     582           1 :         }
     583           1 :         for _, x := range v.NewFiles {
     584           1 :                 customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
     585           1 :                 var tag uint64
     586           1 :                 switch {
     587           1 :                 case x.Meta.HasRangeKeys:
     588           1 :                         tag = tagNewFile5
     589           1 :                 case customFields:
     590           1 :                         tag = tagNewFile4
     591           0 :                 default:
     592           0 :                         tag = tagNewFile2
     593             :                 }
     594           1 :                 e.writeUvarint(tag)
     595           1 :                 e.writeUvarint(uint64(x.Level))
     596           1 :                 e.writeUvarint(uint64(x.Meta.FileNum))
     597           1 :                 e.writeUvarint(x.Meta.Size)
     598           1 :                 if !x.Meta.HasRangeKeys {
     599           1 :                         // If we have no range keys, preserve the original format and write the
     600           1 :                         // smallest and largest point keys.
     601           1 :                         e.writeKey(x.Meta.SmallestPointKey)
     602           1 :                         e.writeKey(x.Meta.LargestPointKey)
     603           1 :                 } else {
     604           1 :                         // When range keys are present, we first write a marker byte that
     605           1 :                         // indicates if the table also contains point keys, in addition to how the
     606           1 :                         // overall bounds for the table should be reconstructed. This byte is
     607           1 :                         // followed by the keys themselves.
     608           1 :                         b, err := x.Meta.boundsMarker()
     609           1 :                         if err != nil {
     610           0 :                                 return err
     611           0 :                         }
     612           1 :                         if err = e.WriteByte(b); err != nil {
     613           0 :                                 return err
     614           0 :                         }
     615             :                         // Write point key bounds (if present).
     616           1 :                         if x.Meta.HasPointKeys {
     617           1 :                                 e.writeKey(x.Meta.SmallestPointKey)
     618           1 :                                 e.writeKey(x.Meta.LargestPointKey)
     619           1 :                         }
     620             :                         // Write range key bounds.
     621           1 :                         e.writeKey(x.Meta.SmallestRangeKey)
     622           1 :                         e.writeKey(x.Meta.LargestRangeKey)
     623             :                 }
     624           1 :                 e.writeUvarint(x.Meta.SmallestSeqNum)
     625           1 :                 e.writeUvarint(x.Meta.LargestSeqNum)
     626           1 :                 if customFields {
     627           1 :                         if x.Meta.CreationTime != 0 {
     628           1 :                                 e.writeUvarint(customTagCreationTime)
     629           1 :                                 var buf [binary.MaxVarintLen64]byte
     630           1 :                                 n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
     631           1 :                                 e.writeBytes(buf[:n])
     632           1 :                         }
     633           1 :                         if x.Meta.MarkedForCompaction {
     634           1 :                                 e.writeUvarint(customTagNeedsCompaction)
     635           1 :                                 e.writeBytes([]byte{1})
     636           1 :                         }
     637           1 :                         if x.Meta.Virtual {
     638           1 :                                 e.writeUvarint(customTagVirtual)
     639           1 :                                 e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum))
     640           1 :                         }
     641           1 :                         if x.Meta.PrefixReplacement != nil {
     642           1 :                                 e.writeUvarint(customTagPrefixRewrite)
     643           1 :                                 e.writeBytes(x.Meta.PrefixReplacement.ContentPrefix)
     644           1 :                                 e.writeBytes(x.Meta.PrefixReplacement.SyntheticPrefix)
     645           1 :                         }
     646           1 :                         if x.Meta.SyntheticSuffix != nil {
     647           1 :                                 e.writeUvarint(customTagSuffixRewrite)
     648           1 :                                 e.writeBytes(x.Meta.SyntheticSuffix)
     649           1 :                         }
     650           1 :                         e.writeUvarint(customTagTerminate)
     651             :                 }
     652             :         }
     653           1 :         _, err := w.Write(e.Bytes())
     654           1 :         return err
     655             : }
     656             : 
     657             : // versionEditDecoder should be used to decode version edits.
     658             : type versionEditDecoder struct {
     659             :         byteReader
     660             : }
     661             : 
     662           1 : func (d versionEditDecoder) readBytes() ([]byte, error) {
     663           1 :         n, err := d.readUvarint()
     664           1 :         if err != nil {
     665           0 :                 return nil, err
     666           0 :         }
     667           1 :         s := make([]byte, n)
     668           1 :         _, err = io.ReadFull(d, s)
     669           1 :         if err != nil {
     670           0 :                 if err == io.ErrUnexpectedEOF {
     671           0 :                         return nil, errCorruptManifest
     672           0 :                 }
     673           0 :                 return nil, err
     674             :         }
     675           1 :         return s, nil
     676             : }
     677             : 
     678           1 : func (d versionEditDecoder) readLevel() (int, error) {
     679           1 :         u, err := d.readUvarint()
     680           1 :         if err != nil {
     681           0 :                 return 0, err
     682           0 :         }
     683           1 :         if u >= NumLevels {
     684           0 :                 return 0, errCorruptManifest
     685           0 :         }
     686           1 :         return int(u), nil
     687             : }
     688             : 
     689           1 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
     690           1 :         u, err := d.readUvarint()
     691           1 :         if err != nil {
     692           0 :                 return 0, err
     693           0 :         }
     694           1 :         return base.FileNum(u), nil
     695             : }
     696             : 
     697           1 : func (d versionEditDecoder) readUvarint() (uint64, error) {
     698           1 :         u, err := binary.ReadUvarint(d)
     699           1 :         if err != nil {
     700           0 :                 if err == io.EOF {
     701           0 :                         return 0, errCorruptManifest
     702           0 :                 }
     703           0 :                 return 0, err
     704             :         }
     705           1 :         return u, nil
     706             : }
     707             : 
     708             : type versionEditEncoder struct {
     709             :         *bytes.Buffer
     710             : }
     711             : 
     712           1 : func (e versionEditEncoder) writeBytes(p []byte) {
     713           1 :         e.writeUvarint(uint64(len(p)))
     714           1 :         e.Write(p)
     715           1 : }
     716             : 
     717           1 : func (e versionEditEncoder) writeKey(k InternalKey) {
     718           1 :         e.writeUvarint(uint64(k.Size()))
     719           1 :         e.Write(k.UserKey)
     720           1 :         buf := k.EncodeTrailer()
     721           1 :         e.Write(buf[:])
     722           1 : }
     723             : 
     724           1 : func (e versionEditEncoder) writeString(s string) {
     725           1 :         e.writeUvarint(uint64(len(s)))
     726           1 :         e.WriteString(s)
     727           1 : }
     728             : 
     729           1 : func (e versionEditEncoder) writeUvarint(u uint64) {
     730           1 :         var buf [binary.MaxVarintLen64]byte
     731           1 :         n := binary.PutUvarint(buf[:], u)
     732           1 :         e.Write(buf[:n])
     733           1 : }
     734             : 
     735             : // BulkVersionEdit summarizes the files added and deleted from a set of version
     736             : // edits.
     737             : //
     738             : // INVARIANTS:
     739             : // No file can be added to a level more than once. This is true globally, and
     740             : // also true for all of the calls to Accumulate for a single bulk version edit.
     741             : //
     742             : // No file can be removed from a level more than once. This is true globally,
     743             : // and also true for all of the calls to Accumulate for a single bulk version
     744             : // edit.
     745             : //
     746             : // A file must not be added and removed from a given level in the same version
     747             : // edit.
     748             : //
     749             : // A file that is being removed from a level must have been added to that level
     750             : // before (in a prior version edit). Note that a given file can be deleted from
     751             : // a level and added to another level in a single version edit
     752             : type BulkVersionEdit struct {
     753             :         Added   [NumLevels]map[base.FileNum]*FileMetadata
     754             :         Deleted [NumLevels]map[base.FileNum]*FileMetadata
     755             : 
     756             :         // AddedFileBacking is a map to support lookup so that we can populate the
     757             :         // FileBacking of virtual sstables during manifest replay.
     758             :         AddedFileBacking   map[base.DiskFileNum]*FileBacking
     759             :         RemovedFileBacking []base.DiskFileNum
     760             : 
     761             :         // AddedByFileNum maps file number to file metadata for all added files
     762             :         // from accumulated version edits. AddedByFileNum is only populated if set
     763             :         // to non-nil by a caller. It must be set to non-nil when replaying
     764             :         // version edits read from a MANIFEST (as opposed to VersionEdits
     765             :         // constructed in-memory).  While replaying a MANIFEST file,
     766             :         // VersionEdit.DeletedFiles map entries have nil values, because the
     767             :         // on-disk deletion record encodes only the file number. Accumulate
     768             :         // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
     769             :         // field with non-nil *FileMetadata.
     770             :         AddedByFileNum map[base.FileNum]*FileMetadata
     771             : 
     772             :         // MarkedForCompactionCountDiff holds the aggregated count of files
     773             :         // marked for compaction added or removed.
     774             :         MarkedForCompactionCountDiff int
     775             : }
     776             : 
     777             : // Accumulate adds the file addition and deletions in the specified version
     778             : // edit to the bulk edit's internal state.
     779             : //
     780             : // INVARIANTS:
     781             : // If a file is added to a given level in a call to Accumulate and then removed
     782             : // from that level in a subsequent call, the file will not be present in the
     783             : // resulting BulkVersionEdit.Deleted for that level.
     784             : //
     785             : // After accumulation of version edits, the bulk version edit may have
     786             : // information about a file which has been deleted from a level, but it may
     787             : // not have information about the same file added to the same level. The add
     788             : // could've occurred as part of a previous bulk version edit. In this case,
     789             : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
     790             : // of the accumulation, because we need to decrease the refcount of the
     791             : // deleted file in Apply.
     792           1 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
     793           1 :         for df, m := range ve.DeletedFiles {
     794           1 :                 dmap := b.Deleted[df.Level]
     795           1 :                 if dmap == nil {
     796           1 :                         dmap = make(map[base.FileNum]*FileMetadata)
     797           1 :                         b.Deleted[df.Level] = dmap
     798           1 :                 }
     799             : 
     800           1 :                 if m == nil {
     801           1 :                         // m is nil only when replaying a MANIFEST.
     802           1 :                         if b.AddedByFileNum == nil {
     803           0 :                                 return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
     804           0 :                         }
     805           1 :                         m = b.AddedByFileNum[df.FileNum]
     806           1 :                         if m == nil {
     807           1 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
     808           1 :                         }
     809             :                 }
     810           1 :                 if m.MarkedForCompaction {
     811           1 :                         b.MarkedForCompactionCountDiff--
     812           1 :                 }
     813           1 :                 if _, ok := b.Added[df.Level][df.FileNum]; !ok {
     814           1 :                         dmap[df.FileNum] = m
     815           1 :                 } else {
     816           1 :                         // Present in b.Added for the same level.
     817           1 :                         delete(b.Added[df.Level], df.FileNum)
     818           1 :                 }
     819             :         }
     820             : 
     821             :         // Generate state for Added backing files. Note that these must be generated
     822             :         // before we loop through the NewFiles, because we need to populate the
     823             :         // FileBackings which might be used by the NewFiles loop.
     824           1 :         if b.AddedFileBacking == nil {
     825           1 :                 b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
     826           1 :         }
     827           1 :         for _, fb := range ve.CreatedBackingTables {
     828           1 :                 if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
     829           0 :                         // There is already a FileBacking associated with fb.DiskFileNum.
     830           0 :                         // This should never happen. There must always be only one FileBacking
     831           0 :                         // associated with a backing sstable.
     832           0 :                         panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
     833             :                 }
     834           1 :                 b.AddedFileBacking[fb.DiskFileNum] = fb
     835             :         }
     836             : 
     837           1 :         for _, nf := range ve.NewFiles {
     838           1 :                 // A new file should not have been deleted in this or a preceding
     839           1 :                 // VersionEdit at the same level (though files can move across levels).
     840           1 :                 if dmap := b.Deleted[nf.Level]; dmap != nil {
     841           1 :                         if _, ok := dmap[nf.Meta.FileNum]; ok {
     842           0 :                                 return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
     843           0 :                         }
     844             :                 }
     845           1 :                 if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
     846           1 :                         // FileBacking for a virtual sstable must only be nil if we're performing
     847           1 :                         // manifest replay.
     848           1 :                         nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
     849           1 :                         if nf.Meta.FileBacking == nil {
     850           0 :                                 return errors.Errorf("FileBacking for virtual sstable must not be nil")
     851           0 :                         }
     852           1 :                 } else if nf.Meta.FileBacking == nil {
     853           0 :                         return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
     854           0 :                 }
     855             : 
     856           1 :                 if b.Added[nf.Level] == nil {
     857           1 :                         b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
     858           1 :                 }
     859           1 :                 b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
     860           1 :                 if b.AddedByFileNum != nil {
     861           1 :                         b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
     862           1 :                 }
     863           1 :                 if nf.Meta.MarkedForCompaction {
     864           0 :                         b.MarkedForCompactionCountDiff++
     865           0 :                 }
     866             :         }
     867             : 
     868             :         // Since a file can be removed from backing files in exactly one version
     869             :         // edit it is safe to just append without any de-duplication.
     870           1 :         b.RemovedFileBacking = append(b.RemovedFileBacking, ve.RemovedBackingTables...)
     871           1 : 
     872           1 :         return nil
     873             : }
     874             : 
     875             : // AccumulateIncompleteAndApplySingleVE should be called if a single version edit
     876             : // is to be applied to the provided curr Version and if the caller needs to
     877             : // update the versionSet.zombieTables map. This function exists separately from
     878             : // BulkVersionEdit.Apply because it is easier to reason about properties
     879             : // regarding BulkVersionedit.Accumulate/Apply and zombie table generation, if we
     880             : // know that exactly one version edit is being accumulated.
     881             : //
     882             : // Note that the version edit passed into this function may be incomplete
     883             : // because compactions don't have the ref counting information necessary to
     884             : // populate VersionEdit.RemovedBackingTables. This function will complete such a
     885             : // version edit by populating RemovedBackingTables.
     886             : //
     887             : // Invariant: Any file being deleted through ve must belong to the curr Version.
     888             : // We can't have a delete for some arbitrary file which does not exist in curr.
     889             : func AccumulateIncompleteAndApplySingleVE(
     890             :         ve *VersionEdit,
     891             :         curr *Version,
     892             :         comparer *base.Comparer,
     893             :         flushSplitBytes int64,
     894             :         readCompactionRate int64,
     895             :         backings *FileBackings,
     896           1 : ) (_ *Version, zombies map[base.DiskFileNum]uint64, _ error) {
     897           1 :         if len(ve.RemovedBackingTables) != 0 {
     898           0 :                 panic("pebble: invalid incomplete version edit")
     899             :         }
     900           1 :         var b BulkVersionEdit
     901           1 :         err := b.Accumulate(ve)
     902           1 :         if err != nil {
     903           0 :                 return nil, nil, err
     904           0 :         }
     905           1 :         zombies = make(map[base.DiskFileNum]uint64)
     906           1 :         v, err := b.Apply(curr, comparer, flushSplitBytes, readCompactionRate, zombies)
     907           1 :         if err != nil {
     908           0 :                 return nil, nil, err
     909           0 :         }
     910             : 
     911           1 :         for _, s := range b.AddedFileBacking {
     912           1 :                 backings.Add(s)
     913           1 :         }
     914             : 
     915           1 :         for fileNum := range zombies {
     916           1 :                 if _, ok := backings.Get(fileNum); ok {
     917           1 :                         // This table was backing some virtual sstable in the latest version,
     918           1 :                         // but is now a zombie. We add RemovedBackingTables entries for
     919           1 :                         // these, before the version edit is written to disk.
     920           1 :                         ve.RemovedBackingTables = append(
     921           1 :                                 ve.RemovedBackingTables, fileNum,
     922           1 :                         )
     923           1 :                         backings.Remove(fileNum)
     924           1 :                 }
     925             :         }
     926           1 :         return v, zombies, nil
     927             : }
     928             : 
     929             : // Apply applies the delta b to the current version to produce a new
     930             : // version. The new version is consistent with respect to the comparer cmp.
     931             : //
     932             : // curr may be nil, which is equivalent to a pointer to a zero version.
     933             : //
     934             : // On success, if a non-nil zombies map is provided to Apply, the map is updated
     935             : // with file numbers and files sizes of deleted files. These files are
     936             : // considered zombies because they are no longer referenced by the returned
     937             : // Version, but cannot be deleted from disk as they are still in use by the
     938             : // incoming Version.
     939             : func (b *BulkVersionEdit) Apply(
     940             :         curr *Version,
     941             :         comparer *base.Comparer,
     942             :         flushSplitBytes int64,
     943             :         readCompactionRate int64,
     944             :         zombies map[base.DiskFileNum]uint64,
     945           1 : ) (*Version, error) {
     946           1 :         addZombie := func(state *FileBacking) {
     947           1 :                 if zombies != nil {
     948           1 :                         zombies[state.DiskFileNum] = state.Size
     949           1 :                 }
     950             :         }
     951           1 :         removeZombie := func(state *FileBacking) {
     952           1 :                 if zombies != nil {
     953           1 :                         delete(zombies, state.DiskFileNum)
     954           1 :                 }
     955             :         }
     956             : 
     957           1 :         v := &Version{
     958           1 :                 cmp: comparer,
     959           1 :         }
     960           1 : 
     961           1 :         // Adjust the count of files marked for compaction.
     962           1 :         if curr != nil {
     963           1 :                 v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
     964           1 :         }
     965           1 :         v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
     966           1 :         if v.Stats.MarkedForCompaction < 0 {
     967           0 :                 return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
     968           0 :         }
     969             : 
     970           1 :         for level := range v.Levels {
     971           1 :                 if curr == nil || curr.Levels[level].tree.root == nil {
     972           1 :                         v.Levels[level] = makeLevelMetadata(comparer.Compare, level, nil /* files */)
     973           1 :                 } else {
     974           1 :                         v.Levels[level] = curr.Levels[level].clone()
     975           1 :                 }
     976           1 :                 if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
     977           1 :                         v.RangeKeyLevels[level] = makeLevelMetadata(comparer.Compare, level, nil /* files */)
     978           1 :                 } else {
     979           1 :                         v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
     980           1 :                 }
     981             : 
     982           1 :                 if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
     983           1 :                         // There are no edits on this level.
     984           1 :                         if level == 0 {
     985           1 :                                 // Initialize L0Sublevels.
     986           1 :                                 if curr == nil || curr.L0Sublevels == nil {
     987           1 :                                         if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
     988           0 :                                                 return nil, errors.Wrap(err, "pebble: internal error")
     989           0 :                                         }
     990           1 :                                 } else {
     991           1 :                                         v.L0Sublevels = curr.L0Sublevels
     992           1 :                                         v.L0SublevelFiles = v.L0Sublevels.Levels
     993           1 :                                 }
     994             :                         }
     995           1 :                         continue
     996             :                 }
     997             : 
     998             :                 // Some edits on this level.
     999           1 :                 lm := &v.Levels[level]
    1000           1 :                 lmRange := &v.RangeKeyLevels[level]
    1001           1 : 
    1002           1 :                 addedFilesMap := b.Added[level]
    1003           1 :                 deletedFilesMap := b.Deleted[level]
    1004           1 :                 if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
    1005           1 :                         return nil, base.CorruptionErrorf(
    1006           1 :                                 "pebble: internal error: No current or added files but have deleted files: %d",
    1007           1 :                                 errors.Safe(len(deletedFilesMap)))
    1008           1 :                 }
    1009             : 
    1010             :                 // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
    1011             :                 // for a level, it won't be present in deletedFilesMap for the same
    1012             :                 // level.
    1013             : 
    1014           1 :                 for _, f := range deletedFilesMap {
    1015           1 :                         if obsolete := v.Levels[level].remove(f); obsolete {
    1016           0 :                                 // Deleting a file from the B-Tree may decrement its
    1017           0 :                                 // reference count. However, because we cloned the
    1018           0 :                                 // previous level's B-Tree, this should never result in a
    1019           0 :                                 // file's reference count dropping to zero.
    1020           0 :                                 err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
    1021           0 :                                 return nil, err
    1022           0 :                         }
    1023           1 :                         if f.HasRangeKeys {
    1024           1 :                                 if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
    1025           0 :                                         // Deleting a file from the B-Tree may decrement its
    1026           0 :                                         // reference count. However, because we cloned the
    1027           0 :                                         // previous level's B-Tree, this should never result in a
    1028           0 :                                         // file's reference count dropping to zero.
    1029           0 :                                         err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
    1030           0 :                                         return nil, err
    1031           0 :                                 }
    1032             :                         }
    1033             : 
    1034             :                         // Note that a backing sst will only become a zombie if the
    1035             :                         // references to it in the latest version is 0. We will remove the
    1036             :                         // backing sst from the zombie list in the next loop if one of the
    1037             :                         // addedFiles in any of the levels is referencing the backing sst.
    1038             :                         // This is possible if a physical sstable is virtualized, or if it
    1039             :                         // is moved.
    1040           1 :                         latestRefCount := f.LatestRefs()
    1041           1 :                         if latestRefCount <= 0 {
    1042           0 :                                 // If a file is present in deletedFilesMap for a level, then it
    1043           0 :                                 // must have already been added to the level previously, which
    1044           0 :                                 // means that its latest ref count cannot be 0.
    1045           0 :                                 err := errors.Errorf("pebble: internal error: incorrect latestRefs reference counting for file", f.FileNum)
    1046           0 :                                 return nil, err
    1047           1 :                         } else if f.LatestUnref() == 0 {
    1048           1 :                                 addZombie(f.FileBacking)
    1049           1 :                         }
    1050             :                 }
    1051             : 
    1052           1 :                 addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
    1053           1 :                 for _, f := range addedFilesMap {
    1054           1 :                         addedFiles = append(addedFiles, f)
    1055           1 :                 }
    1056             :                 // Sort addedFiles by file number. This isn't necessary, but tests which
    1057             :                 // replay invalid manifests check the error output, and the error output
    1058             :                 // depends on the order in which files are added to the btree.
    1059           1 :                 slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
    1060           1 :                         return stdcmp.Compare(a.FileNum, b.FileNum)
    1061           1 :                 })
    1062             : 
    1063           1 :                 var sm, la *FileMetadata
    1064           1 :                 for _, f := range addedFiles {
    1065           1 :                         // NB: allowedSeeks is used for read triggered compactions. It is set using
    1066           1 :                         // Options.Experimental.ReadCompactionRate which defaults to 32KB.
    1067           1 :                         var allowedSeeks int64
    1068           1 :                         if readCompactionRate != 0 {
    1069           1 :                                 allowedSeeks = int64(f.Size) / readCompactionRate
    1070           1 :                         }
    1071           1 :                         if allowedSeeks < 100 {
    1072           1 :                                 allowedSeeks = 100
    1073           1 :                         }
    1074           1 :                         f.AllowedSeeks.Store(allowedSeeks)
    1075           1 :                         f.InitAllowedSeeks = allowedSeeks
    1076           1 : 
    1077           1 :                         err := lm.insert(f)
    1078           1 :                         // We're adding this file to the new version, so increment the
    1079           1 :                         // latest refs count.
    1080           1 :                         f.LatestRef()
    1081           1 :                         if err != nil {
    1082           1 :                                 return nil, errors.Wrap(err, "pebble")
    1083           1 :                         }
    1084           1 :                         if f.HasRangeKeys {
    1085           1 :                                 err = lmRange.insert(f)
    1086           1 :                                 if err != nil {
    1087           0 :                                         return nil, errors.Wrap(err, "pebble")
    1088           0 :                                 }
    1089             :                         }
    1090           1 :                         removeZombie(f.FileBacking)
    1091           1 :                         // Track the keys with the smallest and largest keys, so that we can
    1092           1 :                         // check consistency of the modified span.
    1093           1 :                         if sm == nil || base.InternalCompare(comparer.Compare, sm.Smallest, f.Smallest) > 0 {
    1094           1 :                                 sm = f
    1095           1 :                         }
    1096           1 :                         if la == nil || base.InternalCompare(comparer.Compare, la.Largest, f.Largest) < 0 {
    1097           1 :                                 la = f
    1098           1 :                         }
    1099             :                 }
    1100             : 
    1101           1 :                 if level == 0 {
    1102           1 :                         if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
    1103           1 :                                 // Flushes and ingestions that do not delete any L0 files do not require
    1104           1 :                                 // a regeneration of L0Sublevels from scratch. We can instead generate
    1105           1 :                                 // it incrementally.
    1106           1 :                                 var err error
    1107           1 :                                 // AddL0Files requires addedFiles to be sorted in seqnum order.
    1108           1 :                                 SortBySeqNum(addedFiles)
    1109           1 :                                 v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
    1110           1 :                                 if errors.Is(err, errInvalidL0SublevelsOpt) {
    1111           1 :                                         err = v.InitL0Sublevels(flushSplitBytes)
    1112           1 :                                 } else if invariants.Enabled && err == nil {
    1113           1 :                                         copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], comparer.Compare, comparer.FormatKey, flushSplitBytes)
    1114           1 :                                         if err != nil {
    1115           0 :                                                 panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
    1116             :                                         }
    1117           1 :                                         s1 := describeSublevels(comparer.FormatKey, false /* verbose */, copyOfSublevels.Levels)
    1118           1 :                                         s2 := describeSublevels(comparer.FormatKey, false /* verbose */, v.L0Sublevels.Levels)
    1119           1 :                                         if s1 != s2 {
    1120           0 :                                                 // Add verbosity.
    1121           0 :                                                 s1 := describeSublevels(comparer.FormatKey, true /* verbose */, copyOfSublevels.Levels)
    1122           0 :                                                 s2 := describeSublevels(comparer.FormatKey, true /* verbose */, v.L0Sublevels.Levels)
    1123           0 :                                                 panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
    1124             :                                         }
    1125             :                                 }
    1126           1 :                                 if err != nil {
    1127           0 :                                         return nil, errors.Wrap(err, "pebble: internal error")
    1128           0 :                                 }
    1129           1 :                                 v.L0SublevelFiles = v.L0Sublevels.Levels
    1130           1 :                         } else if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
    1131           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1132           0 :                         }
    1133           1 :                         if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(0), v.Levels[level].Iter()); err != nil {
    1134           0 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1135           0 :                         }
    1136           1 :                         continue
    1137             :                 }
    1138             : 
    1139             :                 // Check consistency of the level in the vicinity of our edits.
    1140           1 :                 if sm != nil && la != nil {
    1141           1 :                         overlap := overlaps(v.Levels[level].Iter(), comparer.Compare, sm.Smallest.UserKey,
    1142           1 :                                 la.Largest.UserKey, la.Largest.IsExclusiveSentinel())
    1143           1 :                         // overlap contains all of the added files. We want to ensure that
    1144           1 :                         // the added files are consistent with neighboring existing files
    1145           1 :                         // too, so reslice the overlap to pull in a neighbor on each side.
    1146           1 :                         check := overlap.Reslice(func(start, end *LevelIterator) {
    1147           1 :                                 if m := start.Prev(); m == nil {
    1148           1 :                                         start.Next()
    1149           1 :                                 }
    1150           1 :                                 if m := end.Next(); m == nil {
    1151           1 :                                         end.Prev()
    1152           1 :                                 }
    1153             :                         })
    1154           1 :                         if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(level), check.Iter()); err != nil {
    1155           1 :                                 return nil, errors.Wrap(err, "pebble: internal error")
    1156           1 :                         }
    1157             :                 }
    1158             :         }
    1159           1 :         return v, nil
    1160             : }

Generated by: LCOV version 1.14