LCOV - code coverage report
Current view: top level - pebble/sstable - colblk_writer.go (source / functions) Hit Total Coverage
Test: 2024-09-21 08:16Z aba5868b - tests only.lcov Lines: 404 587 68.8 %
Date: 2024-09-21 08:16:32 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "encoding/binary"
      10             :         "fmt"
      11             :         "math"
      12             :         "sync"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      17             :         "github.com/cockroachdb/pebble/internal/cache"
      18             :         "github.com/cockroachdb/pebble/internal/keyspan"
      19             :         "github.com/cockroachdb/pebble/objstorage"
      20             :         "github.com/cockroachdb/pebble/sstable/block"
      21             :         "github.com/cockroachdb/pebble/sstable/colblk"
      22             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      23             : )
      24             : 
      25             : // RawColumnWriter is a sstable RawWriter that writes sstables with
      26             : // column-oriented blocks. All table formats TableFormatPebblev5 and later write
      27             : // column-oriented blocks and use RawColumnWriter.
      28             : type RawColumnWriter struct {
      29             :         comparer *base.Comparer
      30             :         meta     WriterMetadata
      31             :         opts     WriterOptions
      32             :         err      error
      33             :         // dataBlockOptions and indexBlockOptions are used to configure the sstable
      34             :         // block flush heuristics.
      35             :         dataBlockOptions     flushDecisionOptions
      36             :         indexBlockOptions    flushDecisionOptions
      37             :         allocatorSizeClasses []int
      38             :         blockPropCollectors  []BlockPropertyCollector
      39             :         blockPropsEncoder    blockPropertiesEncoder
      40             :         obsoleteCollector    obsoleteKeyBlockPropertyCollector
      41             :         props                Properties
      42             :         // block writers buffering unflushed data.
      43             :         dataBlock struct {
      44             :                 colblk.DataBlockWriter
      45             :                 // numDeletions stores the count of point tombstones in this data block.
      46             :                 // It's used to determine if this data block is considered
      47             :                 // tombstone-dense for the purposes of compaction.
      48             :                 numDeletions int
      49             :                 // deletionSize stores the raw size of point tombstones in this data
      50             :                 // block. It's used to determine if this data block is considered
      51             :                 // tombstone-dense for the purposes of compaction.
      52             :                 deletionSize int
      53             :         }
      54             :         indexBlock         colblk.IndexBlockWriter
      55             :         topLevelIndexBlock colblk.IndexBlockWriter
      56             :         rangeDelBlock      colblk.KeyspanBlockWriter
      57             :         rangeKeyBlock      colblk.KeyspanBlockWriter
      58             :         valueBlock         *valueBlockWriter // nil iff WriterOptions.DisableValueBlocks=true
      59             :         // filter accumulates the filter block. If populated, the filter ingests
      60             :         // either the output of w.split (i.e. a prefix extractor) if w.split is not
      61             :         // nil, or the full keys otherwise.
      62             :         filterBlock  filterWriter
      63             :         prevPointKey struct {
      64             :                 trailer    base.InternalKeyTrailer
      65             :                 isObsolete bool
      66             :         }
      67             :         pendingDataBlockSize int
      68             :         indexBlockSize       int
      69             :         queuedDataSize       uint64
      70             : 
      71             :         // indexBuffering holds finished index blocks as they're completed while
      72             :         // building the sstable. If an index block grows sufficiently large
      73             :         // (IndexBlockSize) while an sstable is still being constructed, the sstable
      74             :         // writer will create a two-level index structure. As index blocks are
      75             :         // completed, they're finished and buffered in-memory until the table is
      76             :         // finished. When the table is finished, the buffered index blocks are
      77             :         // flushed in order after all the data blocks, and the top-level index block
      78             :         // is constructed to point to all the individual index blocks.
      79             :         indexBuffering struct {
      80             :                 // partitions holds all the completed index blocks.
      81             :                 partitions []bufferedIndexBlock
      82             :                 // blockAlloc is used to bulk-allocate byte slices used to store index
      83             :                 // blocks in partitions. These live until the sstable is finished.
      84             :                 blockAlloc []byte
      85             :                 // sepAlloc is used to bulk-allocate index block separator slices stored
      86             :                 // in partitions. These live until the sstable is finished.
      87             :                 sepAlloc bytealloc.A
      88             :         }
      89             : 
      90             :         writeQueue struct {
      91             :                 wg  sync.WaitGroup
      92             :                 ch  chan *compressedBlock
      93             :                 err error
      94             :         }
      95             :         layout layoutWriter
      96             : 
      97             :         separatorBuf          []byte
      98             :         tmp                   []byte
      99             :         disableKeyOrderChecks bool
     100             : }
     101             : 
     102             : // Assert that *RawColumnWriter implements RawWriter.
     103             : var _ RawWriter = (*RawColumnWriter)(nil)
     104             : 
     105           1 : func NewColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
     106           1 :         if writable == nil {
     107           0 :                 panic("pebble: nil writable")
     108             :         }
     109           1 :         o = o.ensureDefaults()
     110           1 :         w := &RawColumnWriter{
     111           1 :                 comparer: o.Comparer,
     112           1 :                 meta: WriterMetadata{
     113           1 :                         SmallestSeqNum: math.MaxUint64,
     114           1 :                 },
     115           1 :                 dataBlockOptions: flushDecisionOptions{
     116           1 :                         blockSize:               o.BlockSize,
     117           1 :                         blockSizeThreshold:      (o.BlockSize*o.BlockSizeThreshold + 99) / 100,
     118           1 :                         sizeClassAwareThreshold: (o.BlockSize*o.SizeClassAwareThreshold + 99) / 100,
     119           1 :                 },
     120           1 :                 indexBlockOptions: flushDecisionOptions{
     121           1 :                         blockSize:               o.IndexBlockSize,
     122           1 :                         blockSizeThreshold:      (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100,
     123           1 :                         sizeClassAwareThreshold: (o.IndexBlockSize*o.SizeClassAwareThreshold + 99) / 100,
     124           1 :                 },
     125           1 :                 allocatorSizeClasses: o.AllocatorSizeClasses,
     126           1 :                 opts:                 o,
     127           1 :                 layout:               makeLayoutWriter(writable, o),
     128           1 :         }
     129           1 :         w.dataBlock.Init(o.KeySchema)
     130           1 :         w.indexBlock.Init()
     131           1 :         w.topLevelIndexBlock.Init()
     132           1 :         w.rangeDelBlock.Init(w.comparer.Equal)
     133           1 :         w.rangeKeyBlock.Init(w.comparer.Equal)
     134           1 :         if !o.DisableValueBlocks {
     135           1 :                 w.valueBlock = newValueBlockWriter(
     136           1 :                         w.dataBlockOptions.blockSize, w.dataBlockOptions.blockSizeThreshold,
     137           1 :                         w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
     138             :         }
     139           1 :         if o.FilterPolicy != nil {
     140           0 :                 switch o.FilterType {
     141           0 :                 case TableFilter:
     142           0 :                         w.filterBlock = newTableFilterWriter(o.FilterPolicy)
     143           0 :                 default:
     144           0 :                         panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
     145             :                 }
     146             :         }
     147             : 
     148           1 :         numBlockPropertyCollectors := len(o.BlockPropertyCollectors) + 1 // +1 for the obsolete collector
     149           1 :         if numBlockPropertyCollectors > maxPropertyCollectors {
     150           0 :                 panic(errors.New("pebble: too many block property collectors"))
     151             :         }
     152           1 :         w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
     153           1 :         for _, constructFn := range o.BlockPropertyCollectors {
     154           0 :                 w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
     155           0 :         }
     156           1 :         w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
     157           1 :         var buf bytes.Buffer
     158           1 :         buf.WriteString("[")
     159           1 :         for i := range w.blockPropCollectors {
     160           1 :                 if i > 0 {
     161           0 :                         buf.WriteString(",")
     162           0 :                 }
     163           1 :                 buf.WriteString(w.blockPropCollectors[i].Name())
     164             :         }
     165           1 :         buf.WriteString("]")
     166           1 :         w.props.PropertyCollectorNames = buf.String()
     167           1 : 
     168           1 :         w.props.ComparerName = o.Comparer.Name
     169           1 :         w.props.CompressionName = o.Compression.String()
     170           1 :         w.props.MergerName = o.MergerName
     171           1 : 
     172           1 :         w.writeQueue.ch = make(chan *compressedBlock)
     173           1 :         w.writeQueue.wg.Add(1)
     174           1 :         go w.drainWriteQueue()
     175           1 :         return w
     176             : }
     177             : 
     178             : // Error returns the current accumulated error if any.
     179           0 : func (w *RawColumnWriter) Error() error {
     180           0 :         return w.err
     181           0 : }
     182             : 
     183             : // EstimatedSize returns the estimated size of the sstable being written if
     184             : // a call to Close() was made without adding additional keys.
     185           0 : func (w *RawColumnWriter) EstimatedSize() uint64 {
     186           0 :         sz := rocksDBFooterLen + w.queuedDataSize
     187           0 :         // TODO(jackson): Avoid iterating over partitions by incrementally
     188           0 :         // maintaining the size contribution of all buffered partitions.
     189           0 :         for _, bib := range w.indexBuffering.partitions {
     190           0 :                 // We include the separator user key to account for its bytes in the
     191           0 :                 // top-level index block.
     192           0 :                 //
     193           0 :                 // TODO(jackson): We could incrementally build the top-level index block
     194           0 :                 // and produce an exact calculation of the current top-level index
     195           0 :                 // block's size.
     196           0 :                 sz += uint64(len(bib.block) + block.TrailerLen + len(bib.sep.UserKey))
     197           0 :         }
     198           0 :         if w.rangeDelBlock.KeyCount() > 0 {
     199           0 :                 sz += uint64(w.rangeDelBlock.Size())
     200           0 :         }
     201           0 :         if w.rangeKeyBlock.KeyCount() > 0 {
     202           0 :                 sz += uint64(w.rangeKeyBlock.Size())
     203           0 :         }
     204           0 :         for _, blk := range w.valueBlock.blocks {
     205           0 :                 sz += uint64(blk.block.LengthWithTrailer())
     206           0 :         }
     207           0 :         if w.valueBlock.buf != nil {
     208           0 :                 sz += uint64(len(w.valueBlock.buf.b))
     209           0 :         }
     210             :         // TODO(jackson): Include an estimate of the properties, filter and meta
     211             :         // index blocks sizes.
     212           0 :         return sz
     213             : }
     214             : 
     215             : // Metadata returns the metadata for the finished sstable. Only valid to call
     216             : // after the sstable has been finished.
     217           1 : func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
     218           1 :         if !w.layout.IsFinished() {
     219           0 :                 return nil, errors.New("pebble: writer is not closed")
     220           0 :         }
     221           1 :         return &w.meta, nil
     222             : }
     223             : 
     224             : // EncodeSpan encodes the keys in the given span. The span can contain either
     225             : // only RANGEDEL keys or only range keys.
     226           1 : func (w *RawColumnWriter) EncodeSpan(span keyspan.Span) error {
     227           1 :         if span.Empty() {
     228           0 :                 return nil
     229           0 :         }
     230           1 :         for _, k := range span.Keys {
     231           1 :                 w.meta.updateSeqNum(k.SeqNum())
     232           1 :         }
     233           1 :         if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
     234           1 :                 w.rangeDelBlock.AddSpan(span)
     235           1 :                 return nil
     236           1 :         }
     237           1 :         w.rangeKeyBlock.AddSpan(span)
     238           1 :         return nil
     239             : }
     240             : 
     241             : // AddWithForceObsolete adds a point key/value pair when writing a
     242             : // strict-obsolete sstable. For a given Writer, the keys passed to Add must be
     243             : // in increasing order. Span keys (range deletions, range keys) must be added
     244             : // through EncodeSpan.
     245             : //
     246             : // forceObsolete indicates whether the caller has determined that this key is
     247             : // obsolete even though it may be the latest point key for this userkey. This
     248             : // should be set to true for keys obsoleted by RANGEDELs, and is required for
     249             : // strict-obsolete sstables.
     250             : //
     251             : // Note that there are two properties, S1 and S2 (see comment in format.go)
     252             : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
     253             : // responsibility of the caller. S1 is solely the responsibility of the
     254             : // callee.
     255             : func (w *RawColumnWriter) AddWithForceObsolete(
     256             :         key InternalKey, value []byte, forceObsolete bool,
     257           1 : ) error {
     258           1 :         switch key.Kind() {
     259             :         case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
     260           0 :                 base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
     261           0 :                 return errors.Newf("%s must be added through EncodeSpan", key.Kind())
     262           0 :         case base.InternalKeyKindMerge:
     263           0 :                 if w.opts.IsStrictObsolete {
     264           0 :                         return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
     265           0 :                 }
     266             :         }
     267             : 
     268           1 :         eval, err := w.evaluatePoint(key, len(value))
     269           1 :         if err != nil {
     270           0 :                 return err
     271           0 :         }
     272           1 :         eval.isObsolete = eval.isObsolete || forceObsolete
     273           1 :         w.prevPointKey.trailer = key.Trailer
     274           1 :         w.prevPointKey.isObsolete = eval.isObsolete
     275           1 : 
     276           1 :         var valuePrefix block.ValuePrefix
     277           1 :         var valueStoredWithKey []byte
     278           1 :         if eval.writeToValueBlock {
     279           0 :                 vh, err := w.valueBlock.addValue(value)
     280           0 :                 if err != nil {
     281           0 :                         return err
     282           0 :                 }
     283           0 :                 n := encodeValueHandle(w.tmp[:], vh)
     284           0 :                 valueStoredWithKey = w.tmp[:n]
     285           0 :                 var attribute base.ShortAttribute
     286           0 :                 if w.opts.ShortAttributeExtractor != nil {
     287           0 :                         // TODO(sumeer): for compactions, it is possible that the input sstable
     288           0 :                         // already has this value in the value section and so we have already
     289           0 :                         // extracted the ShortAttribute. Avoid extracting it again. This will
     290           0 :                         // require changing the RawWriter.Add interface.
     291           0 :                         if attribute, err = w.opts.ShortAttributeExtractor(
     292           0 :                                 key.UserKey, int(eval.kcmp.PrefixLen), value); err != nil {
     293           0 :                                 return err
     294           0 :                         }
     295             :                 }
     296           0 :                 valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
     297           1 :         } else {
     298           1 :                 valueStoredWithKey = value
     299           1 :                 if len(value) > 0 {
     300           1 :                         valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
     301           1 :                 }
     302             :         }
     303             : 
     304             :         // Append the key to the data block. We have NOT yet committed to
     305             :         // including the key in the block. The data block writer permits us to
     306             :         // finish the block excluding the last-appended KV.
     307           1 :         entriesWithoutKV := w.dataBlock.Rows()
     308           1 :         w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
     309           1 : 
     310           1 :         // Now that we've appended the KV pair, we can compute the exact size of the
     311           1 :         // block with this key-value pair included. Check to see if we should flush
     312           1 :         // the current block, either with or without the added key-value pair.
     313           1 :         size := w.dataBlock.Size()
     314           1 :         if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize,
     315           1 :                 entriesWithoutKV, w.dataBlockOptions, w.allocatorSizeClasses) {
     316           1 :                 // Flush the data block excluding the key we just added.
     317           1 :                 w.flushDataBlockWithoutNextKey(key.UserKey)
     318           1 :                 // flushDataBlockWithoutNextKey reset the data block builder, and we can
     319           1 :                 // add the key to this next block now.
     320           1 :                 w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
     321           1 :                 w.pendingDataBlockSize = w.dataBlock.Size()
     322           1 :         } else {
     323           1 :                 // We're not flushing the data block, and we're committing to including
     324           1 :                 // the current KV in the block. Remember the new size of the data block
     325           1 :                 // with the current KV.
     326           1 :                 w.pendingDataBlockSize = size
     327           1 :         }
     328             : 
     329           1 :         for i := range w.blockPropCollectors {
     330           1 :                 v := value
     331           1 :                 if key.Kind() == base.InternalKeyKindSet {
     332           1 :                         // Values for SET are not required to be in-place, and in the future
     333           1 :                         // may not even be read by the compaction, so pass nil values. Block
     334           1 :                         // property collectors in such Pebble DB's must not look at the
     335           1 :                         // value.
     336           1 :                         v = nil
     337           1 :                 }
     338           1 :                 if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
     339           0 :                         w.err = err
     340           0 :                         return err
     341           0 :                 }
     342             :         }
     343           1 :         w.obsoleteCollector.AddPoint(eval.isObsolete)
     344           1 :         if w.filterBlock != nil {
     345           0 :                 w.filterBlock.addKey(key.UserKey[:eval.kcmp.PrefixLen])
     346           0 :         }
     347           1 :         w.meta.updateSeqNum(key.SeqNum())
     348           1 :         if !w.meta.HasPointKeys {
     349           1 :                 w.meta.SetSmallestPointKey(key.Clone())
     350           1 :         }
     351             : 
     352           1 :         w.props.NumEntries++
     353           1 :         switch key.Kind() {
     354           1 :         case InternalKeyKindDelete, InternalKeyKindSingleDelete:
     355           1 :                 w.props.NumDeletions++
     356           1 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
     357           1 :                 w.dataBlock.numDeletions++
     358           1 :                 w.dataBlock.deletionSize += len(key.UserKey)
     359           0 :         case InternalKeyKindDeleteSized:
     360           0 :                 var size uint64
     361           0 :                 if len(value) > 0 {
     362           0 :                         var n int
     363           0 :                         size, n = binary.Uvarint(value)
     364           0 :                         if n <= 0 {
     365           0 :                                 return errors.Newf("%s key's value (%x) does not parse as uvarint",
     366           0 :                                         errors.Safe(key.Kind().String()), value)
     367           0 :                         }
     368             :                 }
     369           0 :                 w.props.NumDeletions++
     370           0 :                 w.props.NumSizedDeletions++
     371           0 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
     372           0 :                 w.props.RawPointTombstoneValueSize += size
     373           0 :                 w.dataBlock.numDeletions++
     374           0 :                 w.dataBlock.deletionSize += len(key.UserKey)
     375           0 :         case InternalKeyKindMerge:
     376           0 :                 w.props.NumMergeOperands++
     377             :         }
     378           1 :         w.props.RawKeySize += uint64(key.Size())
     379           1 :         w.props.RawValueSize += uint64(len(value))
     380           1 :         return nil
     381             : }
     382             : 
     383             : type pointKeyEvaluation struct {
     384             :         kcmp              colblk.KeyComparison
     385             :         isObsolete        bool
     386             :         writeToValueBlock bool
     387             : }
     388             : 
     389             : // evaluatePoint takes information about a point key being written to the
     390             : // sstable and decides how the point should be represented, where its value
     391             : // should be stored, etc.
     392             : func (w *RawColumnWriter) evaluatePoint(
     393             :         key base.InternalKey, valueLen int,
     394           1 : ) (eval pointKeyEvaluation, err error) {
     395           1 :         eval.kcmp = w.dataBlock.KeyWriter.ComparePrev(key.UserKey)
     396           1 :         if !w.meta.HasPointKeys {
     397           1 :                 return eval, nil
     398           1 :         }
     399           1 :         keyKind := key.Kind()
     400           1 :         // Ensure that no one adds a point key kind without considering the obsolete
     401           1 :         // handling for that kind.
     402           1 :         switch keyKind {
     403             :         case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
     404           1 :                 InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     405           0 :         default:
     406           0 :                 panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
     407             :         }
     408           1 :         prevKeyKind := w.prevPointKey.trailer.Kind()
     409           1 :         // If same user key, then the current key is obsolete if any of the
     410           1 :         // following is true:
     411           1 :         // C1 The prev key was obsolete.
     412           1 :         // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
     413           1 :         //    preserve SET* and MERGE since their values will be merged into the
     414           1 :         //    previous key. We also must preserve DEL* since there may be an older
     415           1 :         //    SET*/MERGE in a lower level that must not be merged with the MERGE --
     416           1 :         //    if we omit the DEL* that lower SET*/MERGE will become visible.
     417           1 :         //
     418           1 :         // Regardless of whether it is the same user key or not
     419           1 :         // C3 The current key is some kind of point delete, and we are writing to
     420           1 :         //    the lowest level, then it is also obsolete. The correctness of this
     421           1 :         //    relies on the same user key not spanning multiple sstables in a level.
     422           1 :         //
     423           1 :         // C1 ensures that for a user key there is at most one transition from
     424           1 :         // !obsolete to obsolete. Consider a user key k, for which the first n keys
     425           1 :         // are not obsolete. We consider the various value of n:
     426           1 :         //
     427           1 :         // n = 0: This happens due to forceObsolete being set by the caller, or due
     428           1 :         // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
     429           1 :         // must also delete all the lower seqnums for the same user key. C3 triggers
     430           1 :         // due to a point delete and that deletes all the lower seqnums for the same
     431           1 :         // user key.
     432           1 :         //
     433           1 :         // n = 1: This is the common case. It happens when the first key is not a
     434           1 :         // MERGE, or the current key is some kind of point delete.
     435           1 :         //
     436           1 :         // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
     437           1 :         // single non-MERGE key.
     438           1 :         isObsoleteC1AndC2 := eval.kcmp.UserKeyComparison == 0 &&
     439           1 :                 (w.prevPointKey.isObsolete || prevKeyKind != InternalKeyKindMerge)
     440           1 :         isObsoleteC3 := w.opts.WritingToLowestLevel &&
     441           1 :                 (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
     442           1 :                         keyKind == InternalKeyKindDeleteSized)
     443           1 :         eval.isObsolete = isObsoleteC1AndC2 || isObsoleteC3
     444           1 :         // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
     445           1 :         // possible, but requires some care in documenting and checking invariants.
     446           1 :         // There is code that assumes nothing in value blocks because of single MVCC
     447           1 :         // version (those should be ok). We have to ensure setHasSamePrefix is
     448           1 :         // correctly initialized here etc.
     449           1 : 
     450           1 :         if !w.disableKeyOrderChecks && (eval.kcmp.UserKeyComparison < 0 ||
     451           1 :                 (eval.kcmp.UserKeyComparison == 0 && w.prevPointKey.trailer <= key.Trailer)) {
     452           0 :                 return eval, errors.Errorf(
     453           0 :                         "pebble: keys must be added in strictly increasing order: %s",
     454           0 :                         key.Pretty(w.comparer.FormatKey))
     455           0 :         }
     456             : 
     457             :         // We might want to write this key's value to a value block if it has the
     458             :         // same prefix.
     459             :         //
     460             :         // We require:
     461             :         //  . Value blocks to be enabled.
     462             :         //  . The current key to have the same prefix as the previous key.
     463             :         //  . The previous key to be a SET.
     464             :         //  . The current key to be a SET.
     465             :         //  . If there are bounds requiring some keys' values to be in-place, the
     466             :         //    key must not fall within those bounds.
     467             :         //  . The value to be sufficiently large. (Currently we simply require a
     468             :         //    non-zero length, so all non-empty values are eligible for storage
     469             :         //    out-of-band in a value block.)
     470           1 :         if w.opts.DisableValueBlocks || !eval.kcmp.PrefixEqual() ||
     471           1 :                 prevKeyKind != InternalKeyKindSet || keyKind == InternalKeyKindSet {
     472           1 :                 return eval, nil
     473           1 :         }
     474             :         // NB: it is possible that eval.kcmp.UserKeyComparison == 0, i.e., these two
     475             :         // SETs have identical user keys (because of an open snapshot). This should
     476             :         // be the rare case.
     477             : 
     478             :         // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
     479             :         // valueHandle, this should be > 3. But tiny values are common in test and
     480             :         // unlikely in production, so we use 0 here for better test coverage.
     481           0 :         const tinyValueThreshold = 0
     482           0 :         if valueLen <= tinyValueThreshold {
     483           0 :                 return eval, nil
     484           0 :         }
     485             : 
     486             :         // If there are bounds requiring some keys' values to be in-place, compare
     487             :         // the prefix against the bounds.
     488           0 :         if !w.opts.RequiredInPlaceValueBound.IsEmpty() {
     489           0 :                 if w.comparer.Compare(w.opts.RequiredInPlaceValueBound.Upper, key.UserKey[:eval.kcmp.PrefixLen]) <= 0 {
     490           0 :                         // Common case for CockroachDB. Make it empty since all future keys
     491           0 :                         // in this sstable will also have cmpUpper <= 0.
     492           0 :                         w.opts.RequiredInPlaceValueBound = UserKeyPrefixBound{}
     493           0 :                 } else if w.comparer.Compare(key.UserKey[:eval.kcmp.PrefixLen], w.opts.RequiredInPlaceValueBound.Lower) >= 0 {
     494           0 :                         // Don't write to value block if the key is within the bounds.
     495           0 :                         return eval, nil
     496           0 :                 }
     497             :         }
     498           0 :         eval.writeToValueBlock = w.valueBlock != nil
     499           0 :         return eval, nil
     500             : }
     501             : 
     502             : var compressedBlockPool = sync.Pool{
     503           1 :         New: func() interface{} {
     504           1 :                 return new(compressedBlock)
     505           1 :         },
     506             : }
     507             : 
     508             : type compressedBlock struct {
     509             :         physical block.PhysicalBlock
     510             :         blockBuf blockBuf
     511             : }
     512             : 
     513           1 : func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) {
     514           1 :         serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
     515           1 :         w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
     516           1 :         // Compute the separator that will be written to the index block alongside
     517           1 :         // this data block's end offset. It is the separator between the last key in
     518           1 :         // the finished block and the [nextKey] that was excluded from the block.
     519           1 :         w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
     520           1 :         w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf)
     521           1 :         w.dataBlock.Reset()
     522           1 :         w.pendingDataBlockSize = 0
     523           1 : }
     524             : 
     525             : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
     526             : // blocks if the number of deletions in the data block exceeds a threshold or
     527             : // the deletion size exceeds a threshold. It should be called after the
     528             : // data block has been finished.
     529             : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
     530           1 : func (w *RawColumnWriter) maybeIncrementTombstoneDenseBlocks(uncompressedLen int) {
     531           1 :         minSize := w.opts.DeletionSizeRatioThreshold * float32(uncompressedLen)
     532           1 :         if w.dataBlock.numDeletions > w.opts.NumDeletionsThreshold || float32(w.dataBlock.deletionSize) > minSize {
     533           0 :                 w.props.NumTombstoneDenseBlocks++
     534           0 :         }
     535           1 :         w.dataBlock.numDeletions = 0
     536           1 :         w.dataBlock.deletionSize = 0
     537             : }
     538             : 
     539             : // enqueueDataBlock compresses and checksums the provided data block and sends
     540             : // it to the write queue to be asynchronously written to the underlying storage.
     541             : // It also adds the block's index block separator to the pending index block,
     542             : // possibly triggering the index block to be finished and buffered.
     543             : func (w *RawColumnWriter) enqueueDataBlock(
     544             :         serializedBlock []byte, lastKey base.InternalKey, separator []byte,
     545           1 : ) error {
     546           1 :         // TODO(jackson): Avoid allocating the largest point user key every time we
     547           1 :         // set the largest point key. This is what the rowblk writer does too, but
     548           1 :         // it's unnecessary.
     549           1 :         w.meta.SetLargestPointKey(lastKey.Clone())
     550           1 : 
     551           1 :         // Serialize the data block, compress it and send it to the write queue.
     552           1 :         cb := compressedBlockPool.Get().(*compressedBlock)
     553           1 :         cb.blockBuf.checksummer.Type = w.opts.Checksum
     554           1 :         cb.physical = block.CompressAndChecksum(
     555           1 :                 &cb.blockBuf.compressedBuf,
     556           1 :                 serializedBlock,
     557           1 :                 w.opts.Compression,
     558           1 :                 &cb.blockBuf.checksummer,
     559           1 :         )
     560           1 :         if !cb.physical.IsCompressed() {
     561           1 :                 // If the block isn't compressed, cb.physical's underlying data points
     562           1 :                 // directly into a buffer owned by w.dataBlock. Clone it before passing
     563           1 :                 // it to the write queue to be asynchronously written to disk.
     564           1 :                 // TODO(jackson): Should we try to avoid this clone by tracking the
     565           1 :                 // lifetime of the DataBlockWriters?
     566           1 :                 cb.physical = cb.physical.Clone()
     567           1 :         }
     568           1 :         dataBlockHandle := block.Handle{
     569           1 :                 Offset: w.queuedDataSize,
     570           1 :                 Length: uint64(cb.physical.LengthWithoutTrailer()),
     571           1 :         }
     572           1 :         w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
     573           1 :         w.writeQueue.ch <- cb
     574           1 : 
     575           1 :         var err error
     576           1 :         w.blockPropsEncoder.resetProps()
     577           1 :         for i := range w.blockPropCollectors {
     578           1 :                 scratch := w.blockPropsEncoder.getScratchForProp()
     579           1 :                 if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
     580           0 :                         return err
     581           0 :                 }
     582           1 :                 w.blockPropsEncoder.addProp(shortID(i), scratch)
     583             :         }
     584           1 :         dataBlockProps := w.blockPropsEncoder.unsafeProps()
     585           1 : 
     586           1 :         // Add the separator to the index block. This might trigger a flush of the
     587           1 :         // index block too.
     588           1 :         i := w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
     589           1 :         sizeWithEntry := w.indexBlock.Size()
     590           1 :         if shouldFlushWithoutLatestKV(sizeWithEntry, w.indexBlockSize, i, w.indexBlockOptions, w.allocatorSizeClasses) {
     591           0 :                 if err = w.finishIndexBlock(w.indexBlock.Rows() - 1); err != nil {
     592           0 :                         return err
     593           0 :                 }
     594             :                 // finishIndexBlock reset the index block builder, and we can
     595             :                 // add the block handle to this new index block.
     596           0 :                 _ = w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
     597             :         }
     598             :         // Incorporate the finished data block's property into the index block, now
     599             :         // that we've flushed the index block without the new separator if
     600             :         // necessary.
     601           1 :         for i := range w.blockPropCollectors {
     602           1 :                 w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
     603           1 :         }
     604           1 :         return nil
     605             : }
     606             : 
     607             : // finishIndexBlock finishes the currently pending index block with the first
     608             : // [rows] rows. In practice, [rows] is always w.indexBlock.Rows() or
     609             : // w.indexBlock.Rows()-1.
     610             : //
     611             : // The finished index block is buffered until the writer is closed.
     612           1 : func (w *RawColumnWriter) finishIndexBlock(rows int) error {
     613           1 :         defer w.indexBlock.Reset()
     614           1 :         w.blockPropsEncoder.resetProps()
     615           1 :         for i := range w.blockPropCollectors {
     616           1 :                 scratch := w.blockPropsEncoder.getScratchForProp()
     617           1 :                 var err error
     618           1 :                 if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
     619           0 :                         return err
     620           0 :                 }
     621           1 :                 w.blockPropsEncoder.addProp(shortID(i), scratch)
     622             :         }
     623           1 :         indexProps := w.blockPropsEncoder.props()
     624           1 :         bib := bufferedIndexBlock{nEntries: rows, properties: indexProps}
     625           1 : 
     626           1 :         // Copy the last (greatest) separator key in the index block into bib.sep.
     627           1 :         // It'll be the separator on the entry in the top-level index block.
     628           1 :         //
     629           1 :         // TODO(jackson): bib.sep.Trailer is unused within the columnar-block
     630           1 :         // sstable writer. Its existence is a code artifact of reuse of the
     631           1 :         // bufferedIndexBlock type between colblk and rowblk writers. This can be
     632           1 :         // cleaned up.
     633           1 :         bib.sep.Trailer = base.MakeTrailer(base.SeqNumMax, base.InternalKeyKindSeparator)
     634           1 :         w.indexBuffering.sepAlloc, bib.sep.UserKey = w.indexBuffering.sepAlloc.Copy(
     635           1 :                 w.indexBlock.UnsafeSeparator(rows - 1))
     636           1 : 
     637           1 :         // Finish the index block and copy it so that w.indexBlock may be reused.
     638           1 :         block := w.indexBlock.Finish(rows)
     639           1 :         if len(w.indexBuffering.blockAlloc) < len(block) {
     640           1 :                 // Allocate enough bytes for approximately 16 index blocks.
     641           1 :                 w.indexBuffering.blockAlloc = make([]byte, len(block)*16)
     642           1 :         }
     643           1 :         n := copy(w.indexBuffering.blockAlloc, block)
     644           1 :         bib.block = w.indexBuffering.blockAlloc[:n:n]
     645           1 :         w.indexBuffering.blockAlloc = w.indexBuffering.blockAlloc[n:]
     646           1 : 
     647           1 :         w.indexBuffering.partitions = append(w.indexBuffering.partitions, bib)
     648           1 :         return nil
     649             : }
     650             : 
     651             : // flushBufferedIndexBlocks writes all index blocks, including the top-level
     652             : // index block if necessary, to the underlying writable. It returns the block
     653             : // handle of the top index (either the only index block or the top-level index
     654             : // if two-level).
     655           1 : func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, err error) {
     656           1 :         // If there's a currently-pending index block, finish it.
     657           1 :         if w.indexBlock.Rows() > 0 || len(w.indexBuffering.partitions) == 0 {
     658           1 :                 w.finishIndexBlock(w.indexBlock.Rows())
     659           1 :         }
     660             :         // We've buffered all the index blocks. Typically there's just one index
     661             :         // block, in which case we're writing a "single-level" index. If we're
     662             :         // writing a large file or the index separators happen to be excessively
     663             :         // long, we may have several index blocks and need to construct a
     664             :         // "two-level" index structure.
     665           1 :         switch len(w.indexBuffering.partitions) {
     666           0 :         case 0:
     667           0 :                 // This is impossible because we'll flush the index block immediately
     668           0 :                 // above this switch statement if there are no buffered partitions
     669           0 :                 // (regardless of whether there are data block handles in the index
     670           0 :                 // block).
     671           0 :                 panic("unreachable")
     672           1 :         case 1:
     673           1 :                 // Single-level index.
     674           1 :                 rootIndex, err = w.layout.WriteIndexBlock(w.indexBuffering.partitions[0].block)
     675           1 :                 if err != nil {
     676           0 :                         return rootIndex, err
     677           0 :                 }
     678           1 :                 w.props.IndexSize = rootIndex.Length + block.TrailerLen
     679           1 :                 w.props.NumDataBlocks = uint64(w.indexBuffering.partitions[0].nEntries)
     680           1 :                 w.props.IndexType = binarySearchIndex
     681           0 :         default:
     682           0 :                 // Two-level index.
     683           0 :                 for _, part := range w.indexBuffering.partitions {
     684           0 :                         bh, err := w.layout.WriteIndexBlock(part.block)
     685           0 :                         if err != nil {
     686           0 :                                 return block.Handle{}, err
     687           0 :                         }
     688           0 :                         w.props.IndexSize += bh.Length + block.TrailerLen
     689           0 :                         w.props.NumDataBlocks += uint64(w.indexBuffering.partitions[0].nEntries)
     690           0 :                         w.topLevelIndexBlock.AddBlockHandle(part.sep.UserKey, bh, part.properties)
     691             :                 }
     692           0 :                 rootIndex, err = w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish(w.topLevelIndexBlock.Rows()))
     693           0 :                 if err != nil {
     694           0 :                         return block.Handle{}, err
     695           0 :                 }
     696           0 :                 w.props.IndexSize += rootIndex.Length + block.TrailerLen
     697           0 :                 w.props.IndexType = twoLevelIndex
     698             :         }
     699           1 :         return rootIndex, nil
     700             : }
     701             : 
     702             : // drainWriteQueue runs in its own goroutine and is responsible for writing
     703             : // finished, compressed data blocks to the writable. It reads from w.writeQueue
     704             : // until the channel is closed. All data blocks are written by this goroutine.
     705             : // Other blocks are written directly by the client goroutine. See Close.
     706           1 : func (w *RawColumnWriter) drainWriteQueue() {
     707           1 :         defer w.writeQueue.wg.Done()
     708           1 :         for cb := range w.writeQueue.ch {
     709           1 :                 if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
     710           0 :                         w.writeQueue.err = err
     711           0 :                 }
     712           1 :                 cb.blockBuf.clear()
     713           1 :                 cb.physical = block.PhysicalBlock{}
     714           1 :                 compressedBlockPool.Put(cb)
     715             :         }
     716             : }
     717             : 
     718           1 : func (w *RawColumnWriter) Close() (err error) {
     719           1 :         defer func() {
     720           1 :                 if w.valueBlock != nil {
     721           0 :                         releaseValueBlockWriter(w.valueBlock)
     722           0 :                         // Defensive code in case Close gets called again. We don't want to put
     723           0 :                         // the same object to a sync.Pool.
     724           0 :                         w.valueBlock = nil
     725           0 :                 }
     726           1 :                 w.layout.Abort()
     727           1 :                 // Record any error in the writer (so we can exit early if Close is called
     728           1 :                 // again).
     729           1 :                 if err != nil {
     730           0 :                         w.err = err
     731           0 :                 }
     732             :         }()
     733             : 
     734             :         // Finish the last data block and send it to the write queue if it contains
     735             :         // any pending KVs.
     736           1 :         if rows := w.dataBlock.Rows(); rows > 0 {
     737           1 :                 serializedBlock, lastKey := w.dataBlock.Finish(rows, w.pendingDataBlockSize)
     738           1 :                 w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], lastKey.UserKey)
     739           1 :                 w.err = errors.CombineErrors(w.err, w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf))
     740           1 :                 w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
     741           1 :         }
     742             :         // Close the write queue channel so that the goroutine responsible for
     743             :         // writing data blocks to disk knows to exit. Any subsequent blocks (eg,
     744             :         // index, metadata, range key, etc) will be written by the goroutine that
     745             :         // called Close.
     746           1 :         close(w.writeQueue.ch)
     747           1 :         w.writeQueue.wg.Wait()
     748           1 :         // If the write queue encountered any errors while writing out data blocks,
     749           1 :         // it's stored in w.writeQueue.err.
     750           1 :         w.err = firstError(w.err, w.writeQueue.err)
     751           1 :         if w.err != nil {
     752           0 :                 return w.err
     753           0 :         }
     754             : 
     755             :         // INVARIANT: w.queuedDataSize == w.layout.offset.
     756             :         // All data blocks have been written to disk. The queuedDataSize is the
     757             :         // cumulative size of all the data blocks we've sent to the write queue. Now
     758             :         // that they've all been flushed, queuedDataSize should match w.layout's
     759             :         // offset.
     760           1 :         if w.queuedDataSize != w.layout.offset {
     761           0 :                 panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
     762           0 :                         w.queuedDataSize, w.layout.offset))
     763             :         }
     764           1 :         if _, err = w.flushBufferedIndexBlocks(); err != nil {
     765           0 :                 return err
     766           0 :         }
     767             : 
     768             :         // Write the filter block.
     769           1 :         if w.filterBlock != nil {
     770           0 :                 bh, err := w.layout.WriteFilterBlock(w.filterBlock)
     771           0 :                 if err != nil {
     772           0 :                         return err
     773           0 :                 }
     774           0 :                 w.props.FilterPolicyName = w.filterBlock.policyName()
     775           0 :                 w.props.FilterSize = bh.Length
     776             :         }
     777             : 
     778             :         // Write the range deletion block if non-empty.
     779           1 :         if w.rangeDelBlock.KeyCount() > 0 {
     780           1 :                 w.props.NumRangeDeletions = uint64(w.rangeDelBlock.KeyCount())
     781           1 :                 sm, la := w.rangeDelBlock.UnsafeBoundaryKeys()
     782           1 :                 w.meta.SetSmallestRangeDelKey(sm)
     783           1 :                 w.meta.SetLargestRangeDelKey(la)
     784           1 :                 if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
     785           0 :                         return err
     786           0 :                 }
     787             :         }
     788             : 
     789             :         // Write the range key block if non-empty.
     790           1 :         if w.rangeKeyBlock.KeyCount() > 0 {
     791           1 :                 sm, la := w.rangeKeyBlock.UnsafeBoundaryKeys()
     792           1 :                 w.meta.SetSmallestRangeKey(sm)
     793           1 :                 w.meta.SetLargestRangeKey(la)
     794           1 :                 if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
     795           0 :                         return err
     796           0 :                 }
     797             :         }
     798             : 
     799             :         // Write out the value block.
     800           1 :         if w.valueBlock != nil {
     801           1 :                 _, vbStats, err := w.valueBlock.finish(&w.layout, w.layout.offset)
     802           1 :                 if err != nil {
     803           0 :                         return err
     804           0 :                 }
     805           1 :                 w.props.NumValueBlocks = vbStats.numValueBlocks
     806           1 :                 w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
     807           1 :                 w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
     808             :         }
     809             : 
     810             :         // Write the properties block.
     811           1 :         {
     812           1 :                 // Finish and record the prop collectors if props are not yet recorded.
     813           1 :                 // Pre-computed props might have been copied by specialized sst creators
     814           1 :                 // like suffix replacer.
     815           1 :                 if len(w.props.UserProperties) == 0 {
     816           1 :                         userProps := make(map[string]string)
     817           1 :                         for i := range w.blockPropCollectors {
     818           1 :                                 scratch := w.blockPropsEncoder.getScratchForProp()
     819           1 :                                 // Place the shortID in the first byte.
     820           1 :                                 scratch = append(scratch, byte(i))
     821           1 :                                 buf, err := w.blockPropCollectors[i].FinishTable(scratch)
     822           1 :                                 if err != nil {
     823           0 :                                         return err
     824           0 :                                 }
     825           1 :                                 var prop string
     826           1 :                                 if len(buf) > 0 {
     827           1 :                                         prop = string(buf)
     828           1 :                                 }
     829             :                                 // NB: The property is populated in the map even if it is the
     830             :                                 // empty string, since the presence in the map is what indicates
     831             :                                 // that the block property collector was used when writing.
     832           1 :                                 userProps[w.blockPropCollectors[i].Name()] = prop
     833             :                         }
     834           1 :                         if len(userProps) > 0 {
     835           1 :                                 w.props.UserProperties = userProps
     836           1 :                         }
     837             :                 }
     838             : 
     839           1 :                 var raw rowblk.Writer
     840           1 :                 // The restart interval is set to infinity because the properties block
     841           1 :                 // is always read sequentially and cached in a heap located object. This
     842           1 :                 // reduces table size without a significant impact on performance.
     843           1 :                 raw.RestartInterval = propertiesBlockRestartInterval
     844           1 :                 w.props.CompressionOptions = rocksDBCompressionOptions
     845           1 :                 w.props.save(w.opts.TableFormat, &raw)
     846           1 :                 if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
     847           0 :                         return err
     848           0 :                 }
     849             :         }
     850             : 
     851             :         // Write the table footer.
     852           1 :         w.meta.Size, err = w.layout.Finish()
     853           1 :         if err != nil {
     854           0 :                 return err
     855           0 :         }
     856           1 :         w.meta.Properties = w.props
     857           1 :         // Release any held memory and make any future calls error.
     858           1 :         // TODO(jackson): Ensure other calls error appropriately if the writer is
     859           1 :         // cleared.
     860           1 :         *w = RawColumnWriter{meta: w.meta}
     861           1 :         return nil
     862             : }
     863             : 
     864             : func shouldFlushWithoutLatestKV(
     865             :         sizeWithKV int,
     866             :         sizeWithoutKV int,
     867             :         entryCountWithoutKV int,
     868             :         flushOptions flushDecisionOptions,
     869             :         sizeClassHints []int,
     870           1 : ) bool {
     871           1 :         if entryCountWithoutKV == 0 {
     872           1 :                 return false
     873           1 :         }
     874             :         // For size-class aware flushing we need to account for the metadata that is
     875             :         // allocated when this block is loaded into the block cache. For instance, if
     876             :         // a block has size 1020B it may fit within a 1024B class. However, when
     877             :         // loaded into the block cache we also allocate space for the cache entry
     878             :         // metadata. The new allocation of size ~1052B may now only fit within a
     879             :         // 2048B class, which increases internal fragmentation.
     880           1 :         sizeWithKV += cache.ValueMetadataSize
     881           1 :         sizeWithoutKV += cache.ValueMetadataSize
     882           1 :         if sizeWithKV < flushOptions.blockSize {
     883           1 :                 // Even with the new KV we still haven't exceeded the target block size.
     884           1 :                 // There's no harm to committing to flushing with the new KV (and
     885           1 :                 // possibly additional future KVs).
     886           1 :                 return false
     887           1 :         }
     888             : 
     889           1 :         sizeClassWithKV, withOk := blockSizeClass(sizeWithKV, sizeClassHints)
     890           1 :         sizeClassWithoutKV, withoutOk := blockSizeClass(sizeWithoutKV, sizeClassHints)
     891           1 :         if !withOk || !withoutOk {
     892           1 :                 // If the block size could not be mapped to a size class, we fall back
     893           1 :                 // to flushing without the KV since we already know sizeWithKV >=
     894           1 :                 // blockSize.
     895           1 :                 return true
     896           1 :         }
     897             :         // Even though sizeWithKV >= blockSize, we may still want to defer flushing
     898             :         // if the new size class results in less fragmentation than the block
     899             :         // without the KV that does fit within the block size.
     900           0 :         if sizeClassWithKV-sizeWithKV < sizeClassWithoutKV-sizeWithoutKV {
     901           0 :                 return false
     902           0 :         }
     903           0 :         return true
     904             : }

Generated by: LCOV version 1.14