LCOV - code coverage report
Current view: top level - pebble/sstable - colblk_writer.go (source / functions) Hit Total Coverage
Test: 2025-01-09 08:16Z 97ad2bcc - tests only.lcov Lines: 699 808 86.5 %
Date: 2025-01-09 08:17:03 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "math"
      13             :         "slices"
      14             :         "sync"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      19             :         "github.com/cockroachdb/pebble/internal/invariants"
      20             :         "github.com/cockroachdb/pebble/internal/keyspan"
      21             :         "github.com/cockroachdb/pebble/objstorage"
      22             :         "github.com/cockroachdb/pebble/sstable/block"
      23             :         "github.com/cockroachdb/pebble/sstable/colblk"
      24             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      25             :         "github.com/cockroachdb/pebble/sstable/valblk"
      26             : )
      27             : 
      28             : // RawColumnWriter is a sstable RawWriter that writes sstables with
      29             : // column-oriented blocks. All table formats TableFormatPebblev5 and later write
      30             : // column-oriented blocks and use RawColumnWriter.
      31             : type RawColumnWriter struct {
      32             :         comparer *base.Comparer
      33             :         meta     WriterMetadata
      34             :         opts     WriterOptions
      35             :         err      error
      36             : 
      37             :         dataFlush           block.FlushGovernor
      38             :         indexFlush          block.FlushGovernor
      39             :         blockPropCollectors []BlockPropertyCollector
      40             :         blockPropsEncoder   blockPropertiesEncoder
      41             :         obsoleteCollector   obsoleteKeyBlockPropertyCollector
      42             :         props               Properties
      43             :         // block writers buffering unflushed data.
      44             :         dataBlock struct {
      45             :                 colblk.DataBlockEncoder
      46             :                 // numDeletions stores the count of point tombstones in this data block.
      47             :                 // It's used to determine if this data block is considered
      48             :                 // tombstone-dense for the purposes of compaction.
      49             :                 numDeletions int
      50             :                 // deletionSize stores the raw size of point tombstones in this data
      51             :                 // block. It's used to determine if this data block is considered
      52             :                 // tombstone-dense for the purposes of compaction.
      53             :                 deletionSize int
      54             :         }
      55             :         indexBlock         colblk.IndexBlockWriter
      56             :         topLevelIndexBlock colblk.IndexBlockWriter
      57             :         rangeDelBlock      colblk.KeyspanBlockWriter
      58             :         rangeKeyBlock      colblk.KeyspanBlockWriter
      59             :         valueBlock         *valblk.Writer // nil iff WriterOptions.DisableValueBlocks=true
      60             :         // filter accumulates the filter block. If populated, the filter ingests
      61             :         // either the output of w.split (i.e. a prefix extractor) if w.split is not
      62             :         // nil, or the full keys otherwise.
      63             :         filterBlock  filterWriter
      64             :         prevPointKey struct {
      65             :                 trailer    base.InternalKeyTrailer
      66             :                 isObsolete bool
      67             :         }
      68             :         pendingDataBlockSize int
      69             :         indexBlockSize       int
      70             :         queuedDataSize       uint64
      71             : 
      72             :         // indexBuffering holds finished index blocks as they're completed while
      73             :         // building the sstable. If an index block grows sufficiently large
      74             :         // (IndexBlockSize) while an sstable is still being constructed, the sstable
      75             :         // writer will create a two-level index structure. As index blocks are
      76             :         // completed, they're finished and buffered in-memory until the table is
      77             :         // finished. When the table is finished, the buffered index blocks are
      78             :         // flushed in order after all the data blocks, and the top-level index block
      79             :         // is constructed to point to all the individual index blocks.
      80             :         indexBuffering struct {
      81             :                 // partitions holds all the completed index blocks.
      82             :                 partitions []bufferedIndexBlock
      83             :                 // blockAlloc is used to bulk-allocate byte slices used to store index
      84             :                 // blocks in partitions. These live until the sstable is finished.
      85             :                 blockAlloc []byte
      86             :                 // sepAlloc is used to bulk-allocate index block separator slices stored
      87             :                 // in partitions. These live until the sstable is finished.
      88             :                 sepAlloc bytealloc.A
      89             :         }
      90             : 
      91             :         writeQueue struct {
      92             :                 wg  sync.WaitGroup
      93             :                 ch  chan *compressedBlock
      94             :                 err error
      95             :         }
      96             :         layout layoutWriter
      97             : 
      98             :         lastKeyBuf            []byte
      99             :         separatorBuf          []byte
     100             :         tmp                   [blockHandleLikelyMaxLen]byte
     101             :         previousUserKey       invariants.Value[[]byte]
     102             :         disableKeyOrderChecks bool
     103             : }
     104             : 
     105             : // Assert that *RawColumnWriter implements RawWriter.
     106             : var _ RawWriter = (*RawColumnWriter)(nil)
     107             : 
     108           1 : func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
     109           1 :         if writable == nil {
     110           0 :                 panic("pebble: nil writable")
     111             :         }
     112           1 :         if !o.TableFormat.BlockColumnar() {
     113           0 :                 panic(errors.AssertionFailedf("newColumnarWriter cannot create sstables with %s format", o.TableFormat))
     114             :         }
     115           1 :         o = o.ensureDefaults()
     116           1 :         w := &RawColumnWriter{
     117           1 :                 comparer: o.Comparer,
     118           1 :                 meta: WriterMetadata{
     119           1 :                         SmallestSeqNum: math.MaxUint64,
     120           1 :                 },
     121           1 :                 opts:                  o,
     122           1 :                 layout:                makeLayoutWriter(writable, o),
     123           1 :                 disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
     124           1 :         }
     125           1 :         w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
     126           1 :         w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
     127           1 :         w.dataBlock.Init(o.KeySchema)
     128           1 :         w.indexBlock.Init()
     129           1 :         w.topLevelIndexBlock.Init()
     130           1 :         w.rangeDelBlock.Init(w.comparer.Equal)
     131           1 :         w.rangeKeyBlock.Init(w.comparer.Equal)
     132           1 :         if !o.DisableValueBlocks {
     133           1 :                 w.valueBlock = valblk.NewWriter(
     134           1 :                         block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
     135           1 :                         w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
     136             :         }
     137           1 :         if o.FilterPolicy != nil {
     138           1 :                 switch o.FilterType {
     139           1 :                 case TableFilter:
     140           1 :                         w.filterBlock = newTableFilterWriter(o.FilterPolicy)
     141           0 :                 default:
     142           0 :                         panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
     143             :                 }
     144             :         }
     145             : 
     146           1 :         numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
     147           1 :         if !o.disableObsoleteCollector {
     148           1 :                 numBlockPropertyCollectors++
     149           1 :         }
     150           1 :         if numBlockPropertyCollectors > maxPropertyCollectors {
     151           0 :                 panic(errors.New("pebble: too many block property collectors"))
     152             :         }
     153           1 :         w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
     154           1 :         for _, constructFn := range o.BlockPropertyCollectors {
     155           1 :                 w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
     156           1 :         }
     157           1 :         if !o.disableObsoleteCollector {
     158           1 :                 w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
     159           1 :         }
     160           1 :         var buf bytes.Buffer
     161           1 :         buf.WriteString("[")
     162           1 :         for i := range w.blockPropCollectors {
     163           1 :                 if i > 0 {
     164           1 :                         buf.WriteString(",")
     165           1 :                 }
     166           1 :                 buf.WriteString(w.blockPropCollectors[i].Name())
     167             :         }
     168           1 :         buf.WriteString("]")
     169           1 :         w.props.PropertyCollectorNames = buf.String()
     170           1 : 
     171           1 :         w.props.ComparerName = o.Comparer.Name
     172           1 :         w.props.CompressionName = o.Compression.String()
     173           1 :         w.props.KeySchemaName = o.KeySchema.Name
     174           1 :         w.props.MergerName = o.MergerName
     175           1 : 
     176           1 :         w.writeQueue.ch = make(chan *compressedBlock)
     177           1 :         w.writeQueue.wg.Add(1)
     178           1 :         go w.drainWriteQueue()
     179           1 :         return w
     180             : }
     181             : 
     182             : // Error returns the current accumulated error if any.
     183           1 : func (w *RawColumnWriter) Error() error {
     184           1 :         return w.err
     185           1 : }
     186             : 
     187             : // EstimatedSize returns the estimated size of the sstable being written if
     188             : // a call to Close() was made without adding additional keys.
     189           1 : func (w *RawColumnWriter) EstimatedSize() uint64 {
     190           1 :         sz := rocksDBFooterLen + w.queuedDataSize
     191           1 :         // TODO(jackson): Avoid iterating over partitions by incrementally
     192           1 :         // maintaining the size contribution of all buffered partitions.
     193           1 :         for _, bib := range w.indexBuffering.partitions {
     194           0 :                 // We include the separator user key to account for its bytes in the
     195           0 :                 // top-level index block.
     196           0 :                 //
     197           0 :                 // TODO(jackson): We could incrementally build the top-level index block
     198           0 :                 // and produce an exact calculation of the current top-level index
     199           0 :                 // block's size.
     200           0 :                 sz += uint64(len(bib.block) + block.TrailerLen + len(bib.sep.UserKey))
     201           0 :         }
     202           1 :         if w.rangeDelBlock.KeyCount() > 0 {
     203           1 :                 sz += uint64(w.rangeDelBlock.Size())
     204           1 :         }
     205           1 :         if w.rangeKeyBlock.KeyCount() > 0 {
     206           1 :                 sz += uint64(w.rangeKeyBlock.Size())
     207           1 :         }
     208           1 :         if w.valueBlock != nil {
     209           1 :                 sz += w.valueBlock.Size()
     210           1 :         }
     211             :         // TODO(jackson): Include an estimate of the properties, filter and meta
     212             :         // index blocks sizes.
     213           1 :         return sz
     214             : }
     215             : 
     216             : // ComparePrev compares the provided user to the last point key written to the
     217             : // writer. The returned value is equivalent to Compare(key, prevKey) where
     218             : // prevKey is the last point key written to the writer.
     219             : //
     220             : // If no key has been written yet, ComparePrev returns +1.
     221             : //
     222             : // Must not be called after Writer is closed.
     223           1 : func (w *RawColumnWriter) ComparePrev(k []byte) int {
     224           1 :         if w == nil || w.dataBlock.Rows() == 0 {
     225           1 :                 return +1
     226           1 :         }
     227           1 :         return int(w.dataBlock.KeyWriter.ComparePrev(k).UserKeyComparison)
     228             : }
     229             : 
     230             : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
     231             : // be used internally by Pebble.
     232             : func (w *RawColumnWriter) SetSnapshotPinnedProperties(
     233             :         pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
     234           1 : ) {
     235           1 :         w.props.SnapshotPinnedKeys = pinnedKeyCount
     236           1 :         w.props.SnapshotPinnedKeySize = pinnedKeySize
     237           1 :         w.props.SnapshotPinnedValueSize = pinnedValueSize
     238           1 : }
     239             : 
     240             : // Metadata returns the metadata for the finished sstable. Only valid to call
     241             : // after the sstable has been finished.
     242           1 : func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
     243           1 :         if !w.layout.IsFinished() {
     244           0 :                 return nil, errors.New("pebble: writer is not closed")
     245           0 :         }
     246           1 :         return &w.meta, nil
     247             : }
     248             : 
     249             : // EncodeSpan encodes the keys in the given span. The span can contain either
     250             : // only RANGEDEL keys or only range keys.
     251           1 : func (w *RawColumnWriter) EncodeSpan(span keyspan.Span) error {
     252           1 :         if span.Empty() {
     253           1 :                 return nil
     254           1 :         }
     255           1 :         for _, k := range span.Keys {
     256           1 :                 w.meta.updateSeqNum(k.SeqNum())
     257           1 :         }
     258             : 
     259           1 :         blockWriter := &w.rangeKeyBlock
     260           1 :         if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
     261           1 :                 blockWriter = &w.rangeDelBlock
     262           1 :                 // Update range delete properties.
     263           1 :                 // NB: These properties are computed differently than the rowblk sstable
     264           1 :                 // writer because this writer does not flatten them into row key-value
     265           1 :                 // pairs.
     266           1 :                 w.props.RawKeySize += uint64(len(span.Start) + len(span.End))
     267           1 :                 count := uint64(len(span.Keys))
     268           1 :                 w.props.NumEntries += count
     269           1 :                 w.props.NumDeletions += count
     270           1 :                 w.props.NumRangeDeletions += count
     271           1 :         } else {
     272           1 :                 // Update range key properties.
     273           1 :                 // NB: These properties are computed differently than the rowblk sstable
     274           1 :                 // writer because this writer does not flatten them into row key-value
     275           1 :                 // pairs.
     276           1 :                 w.props.RawRangeKeyKeySize += uint64(len(span.Start) + len(span.End))
     277           1 :                 for _, k := range span.Keys {
     278           1 :                         w.props.RawRangeKeyValueSize += uint64(len(k.Value))
     279           1 :                         switch k.Kind() {
     280           1 :                         case base.InternalKeyKindRangeKeyDelete:
     281           1 :                                 w.props.NumRangeKeyDels++
     282           1 :                         case base.InternalKeyKindRangeKeySet:
     283           1 :                                 w.props.NumRangeKeySets++
     284           1 :                         case base.InternalKeyKindRangeKeyUnset:
     285           1 :                                 w.props.NumRangeKeyUnsets++
     286           0 :                         default:
     287           0 :                                 panic(errors.Errorf("pebble: invalid range key type: %s", k.Kind()))
     288             :                         }
     289             :                 }
     290           1 :                 for i := range w.blockPropCollectors {
     291           1 :                         if err := w.blockPropCollectors[i].AddRangeKeys(span); err != nil {
     292           0 :                                 return err
     293           0 :                         }
     294             :                 }
     295             :         }
     296           1 :         if !w.disableKeyOrderChecks && blockWriter.KeyCount() > 0 {
     297           1 :                 // Check that spans are being added in fragmented order. If the two
     298           1 :                 // tombstones overlap, their start and end keys must be identical.
     299           1 :                 prevStart, prevEnd, prevTrailer := blockWriter.UnsafeLastSpan()
     300           1 :                 if w.opts.Comparer.Equal(prevStart, span.Start) && w.opts.Comparer.Equal(prevEnd, span.End) {
     301           1 :                         if prevTrailer < span.Keys[0].Trailer {
     302           1 :                                 w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
     303           1 :                                         w.opts.Comparer.FormatKey(prevStart),
     304           1 :                                         w.opts.Comparer.FormatKey(prevEnd),
     305           1 :                                         prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
     306           1 :                         }
     307           1 :                 } else if c := w.opts.Comparer.Compare(prevEnd, span.Start); c > 0 {
     308           1 :                         w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
     309           1 :                                 w.opts.Comparer.FormatKey(prevStart),
     310           1 :                                 w.opts.Comparer.FormatKey(prevEnd),
     311           1 :                                 prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
     312           1 :                         return w.err
     313           1 :                 }
     314             :         }
     315           1 :         blockWriter.AddSpan(span)
     316           1 :         return nil
     317             : }
     318             : 
     319             : // AddWithForceObsolete adds a point key/value pair when writing a
     320             : // strict-obsolete sstable. For a given Writer, the keys passed to Add must be
     321             : // in increasing order. Span keys (range deletions, range keys) must be added
     322             : // through EncodeSpan.
     323             : //
     324             : // forceObsolete indicates whether the caller has determined that this key is
     325             : // obsolete even though it may be the latest point key for this userkey. This
     326             : // should be set to true for keys obsoleted by RANGEDELs, and is required for
     327             : // strict-obsolete sstables.
     328             : //
     329             : // Note that there are two properties, S1 and S2 (see comment in format.go)
     330             : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
     331             : // responsibility of the caller. S1 is solely the responsibility of the
     332             : // callee.
     333             : func (w *RawColumnWriter) AddWithForceObsolete(
     334             :         key InternalKey, value []byte, forceObsolete bool,
     335           1 : ) error {
     336           1 :         switch key.Kind() {
     337             :         case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
     338           1 :                 base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
     339           1 :                 return errors.Newf("%s must be added through EncodeSpan", key.Kind())
     340           1 :         case base.InternalKeyKindMerge:
     341           1 :                 if w.opts.IsStrictObsolete {
     342           0 :                         return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
     343           0 :                 }
     344             :         }
     345             : 
     346           1 :         eval, err := w.evaluatePoint(key, len(value))
     347           1 :         if err != nil {
     348           1 :                 return err
     349           1 :         }
     350           1 :         eval.isObsolete = eval.isObsolete || forceObsolete
     351           1 :         w.prevPointKey.trailer = key.Trailer
     352           1 :         w.prevPointKey.isObsolete = eval.isObsolete
     353           1 : 
     354           1 :         var valuePrefix block.ValuePrefix
     355           1 :         var valueStoredWithKey []byte
     356           1 :         if eval.writeToValueBlock {
     357           1 :                 vh, err := w.valueBlock.AddValue(value)
     358           1 :                 if err != nil {
     359           0 :                         return err
     360           0 :                 }
     361           1 :                 n := valblk.EncodeHandle(w.tmp[:], vh)
     362           1 :                 valueStoredWithKey = w.tmp[:n]
     363           1 :                 var attribute base.ShortAttribute
     364           1 :                 if w.opts.ShortAttributeExtractor != nil {
     365           1 :                         // TODO(sumeer): for compactions, it is possible that the input sstable
     366           1 :                         // already has this value in the value section and so we have already
     367           1 :                         // extracted the ShortAttribute. Avoid extracting it again. This will
     368           1 :                         // require changing the RawWriter.Add interface.
     369           1 :                         if attribute, err = w.opts.ShortAttributeExtractor(
     370           1 :                                 key.UserKey, int(eval.kcmp.PrefixLen), value); err != nil {
     371           0 :                                 return err
     372           0 :                         }
     373             :                 }
     374           1 :                 valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
     375           1 :         } else {
     376           1 :                 valueStoredWithKey = value
     377           1 :                 if len(value) > 0 {
     378           1 :                         valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
     379           1 :                 }
     380             :         }
     381             : 
     382             :         // Append the key to the data block. We have NOT yet committed to
     383             :         // including the key in the block. The data block writer permits us to
     384             :         // finish the block excluding the last-appended KV.
     385           1 :         entriesWithoutKV := w.dataBlock.Rows()
     386           1 :         w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
     387           1 : 
     388           1 :         // Now that we've appended the KV pair, we can compute the exact size of the
     389           1 :         // block with this key-value pair included. Check to see if we should flush
     390           1 :         // the current block, either with or without the added key-value pair.
     391           1 :         size := w.dataBlock.Size()
     392           1 :         if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize, entriesWithoutKV, &w.dataFlush) {
     393           1 :                 // Flush the data block excluding the key we just added.
     394           1 :                 if err := w.flushDataBlockWithoutNextKey(key.UserKey); err != nil {
     395           0 :                         w.err = err
     396           0 :                         return err
     397           0 :                 }
     398             :                 // flushDataBlockWithoutNextKey reset the data block builder, and we can
     399             :                 // add the key to this next block now.
     400           1 :                 w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
     401           1 :                 w.pendingDataBlockSize = w.dataBlock.Size()
     402           1 :         } else {
     403           1 :                 // We're not flushing the data block, and we're committing to including
     404           1 :                 // the current KV in the block. Remember the new size of the data block
     405           1 :                 // with the current KV.
     406           1 :                 w.pendingDataBlockSize = size
     407           1 :         }
     408             : 
     409           1 :         for i := range w.blockPropCollectors {
     410           1 :                 v := value
     411           1 :                 if key.Kind() == base.InternalKeyKindSet {
     412           1 :                         // Values for SET are not required to be in-place, and in the future
     413           1 :                         // may not even be read by the compaction, so pass nil values. Block
     414           1 :                         // property collectors in such Pebble DB's must not look at the
     415           1 :                         // value.
     416           1 :                         v = nil
     417           1 :                 }
     418           1 :                 if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
     419           0 :                         w.err = err
     420           0 :                         return err
     421           0 :                 }
     422             :         }
     423           1 :         w.obsoleteCollector.AddPoint(eval.isObsolete)
     424           1 :         if w.filterBlock != nil {
     425           1 :                 w.filterBlock.addKey(key.UserKey[:eval.kcmp.PrefixLen])
     426           1 :         }
     427           1 :         w.meta.updateSeqNum(key.SeqNum())
     428           1 :         if !w.meta.HasPointKeys {
     429           1 :                 w.meta.SetSmallestPointKey(key.Clone())
     430           1 :         }
     431             : 
     432           1 :         w.props.NumEntries++
     433           1 :         switch key.Kind() {
     434           1 :         case InternalKeyKindDelete, InternalKeyKindSingleDelete:
     435           1 :                 w.props.NumDeletions++
     436           1 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
     437           1 :                 w.dataBlock.numDeletions++
     438           1 :                 w.dataBlock.deletionSize += len(key.UserKey)
     439           1 :         case InternalKeyKindDeleteSized:
     440           1 :                 var size uint64
     441           1 :                 if len(value) > 0 {
     442           1 :                         var n int
     443           1 :                         size, n = binary.Uvarint(value)
     444           1 :                         if n <= 0 {
     445           0 :                                 return errors.Newf("%s key's value (%x) does not parse as uvarint",
     446           0 :                                         errors.Safe(key.Kind().String()), value)
     447           0 :                         }
     448             :                 }
     449           1 :                 w.props.NumDeletions++
     450           1 :                 w.props.NumSizedDeletions++
     451           1 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
     452           1 :                 w.props.RawPointTombstoneValueSize += size
     453           1 :                 w.dataBlock.numDeletions++
     454           1 :                 w.dataBlock.deletionSize += len(key.UserKey)
     455           1 :         case InternalKeyKindMerge:
     456           1 :                 w.props.NumMergeOperands++
     457             :         }
     458           1 :         w.props.RawKeySize += uint64(key.Size())
     459           1 :         w.props.RawValueSize += uint64(len(value))
     460           1 :         return nil
     461             : }
     462             : 
     463             : type pointKeyEvaluation struct {
     464             :         kcmp              colblk.KeyComparison
     465             :         isObsolete        bool
     466             :         writeToValueBlock bool
     467             : }
     468             : 
     469             : // evaluatePoint takes information about a point key being written to the
     470             : // sstable and decides how the point should be represented, where its value
     471             : // should be stored, etc.
     472             : func (w *RawColumnWriter) evaluatePoint(
     473             :         key base.InternalKey, valueLen int,
     474           1 : ) (eval pointKeyEvaluation, err error) {
     475           1 :         eval.kcmp = w.dataBlock.KeyWriter.ComparePrev(key.UserKey)
     476           1 : 
     477           1 :         // When invariants are enabled, validate kcmp.
     478           1 :         if invariants.Enabled {
     479           1 :                 colblk.AssertKeyCompare(w.comparer, key.UserKey, w.previousUserKey.Get(), eval.kcmp)
     480           1 :                 w.previousUserKey.Set(append(w.previousUserKey.Get()[:0], key.UserKey...))
     481           1 :         }
     482             : 
     483           1 :         if !w.meta.HasPointKeys {
     484           1 :                 return eval, nil
     485           1 :         }
     486           1 :         keyKind := key.Kind()
     487           1 :         // Ensure that no one adds a point key kind without considering the obsolete
     488           1 :         // handling for that kind.
     489           1 :         switch keyKind {
     490             :         case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
     491           1 :                 InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     492           0 :         default:
     493           0 :                 panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
     494             :         }
     495           1 :         prevKeyKind := w.prevPointKey.trailer.Kind()
     496           1 :         // If same user key, then the current key is obsolete if any of the
     497           1 :         // following is true:
     498           1 :         // C1 The prev key was obsolete.
     499           1 :         // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
     500           1 :         //    preserve SET* and MERGE since their values will be merged into the
     501           1 :         //    previous key. We also must preserve DEL* since there may be an older
     502           1 :         //    SET*/MERGE in a lower level that must not be merged with the MERGE --
     503           1 :         //    if we omit the DEL* that lower SET*/MERGE will become visible.
     504           1 :         //
     505           1 :         // Regardless of whether it is the same user key or not
     506           1 :         // C3 The current key is some kind of point delete, and we are writing to
     507           1 :         //    the lowest level, then it is also obsolete. The correctness of this
     508           1 :         //    relies on the same user key not spanning multiple sstables in a level.
     509           1 :         //
     510           1 :         // C1 ensures that for a user key there is at most one transition from
     511           1 :         // !obsolete to obsolete. Consider a user key k, for which the first n keys
     512           1 :         // are not obsolete. We consider the various value of n:
     513           1 :         //
     514           1 :         // n = 0: This happens due to forceObsolete being set by the caller, or due
     515           1 :         // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
     516           1 :         // must also delete all the lower seqnums for the same user key. C3 triggers
     517           1 :         // due to a point delete and that deletes all the lower seqnums for the same
     518           1 :         // user key.
     519           1 :         //
     520           1 :         // n = 1: This is the common case. It happens when the first key is not a
     521           1 :         // MERGE, or the current key is some kind of point delete.
     522           1 :         //
     523           1 :         // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
     524           1 :         // single non-MERGE key.
     525           1 :         isObsoleteC1AndC2 := eval.kcmp.UserKeyComparison == 0 &&
     526           1 :                 (w.prevPointKey.isObsolete || prevKeyKind != InternalKeyKindMerge)
     527           1 :         isObsoleteC3 := w.opts.WritingToLowestLevel &&
     528           1 :                 (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
     529           1 :                         keyKind == InternalKeyKindDeleteSized)
     530           1 :         eval.isObsolete = isObsoleteC1AndC2 || isObsoleteC3
     531           1 :         // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
     532           1 :         // possible, but requires some care in documenting and checking invariants.
     533           1 :         // There is code that assumes nothing in value blocks because of single MVCC
     534           1 :         // version (those should be ok). We have to ensure setHasSamePrefix is
     535           1 :         // correctly initialized here etc.
     536           1 : 
     537           1 :         if !w.disableKeyOrderChecks && (eval.kcmp.UserKeyComparison < 0 ||
     538           1 :                 (eval.kcmp.UserKeyComparison == 0 && w.prevPointKey.trailer <= key.Trailer)) {
     539           1 :                 previousKey := base.InternalKey{
     540           1 :                         UserKey: w.dataBlock.MaterializeLastUserKey(nil),
     541           1 :                         Trailer: w.prevPointKey.trailer,
     542           1 :                 }
     543           1 :                 return eval, errors.Errorf(
     544           1 :                         "pebble: keys must be added in strictly increasing order: %s, %s",
     545           1 :                         previousKey.Pretty(w.comparer.FormatKey),
     546           1 :                         key.Pretty(w.comparer.FormatKey))
     547           1 :         }
     548             : 
     549             :         // We might want to write this key's value to a value block if it has the
     550             :         // same prefix.
     551             :         //
     552             :         // We require:
     553             :         //  . Value blocks to be enabled.
     554             :         //  . The current key to have the same prefix as the previous key.
     555             :         //  . The previous key to be a SET.
     556             :         //  . The current key to be a SET.
     557             :         //  . If there are bounds requiring some keys' values to be in-place, the
     558             :         //    key must not fall within those bounds.
     559             :         //  . The value to be sufficiently large. (Currently we simply require a
     560             :         //    non-zero length, so all non-empty values are eligible for storage
     561             :         //    out-of-band in a value block.)
     562             :         //
     563             :         // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
     564             :         // valueHandle, this should be > 3. But tiny values are common in test and
     565             :         // unlikely in production, so we use 0 here for better test coverage.
     566           1 :         const tinyValueThreshold = 0
     567           1 :         useValueBlock := !w.opts.DisableValueBlocks &&
     568           1 :                 eval.kcmp.PrefixEqual() &&
     569           1 :                 prevKeyKind == InternalKeyKindSet &&
     570           1 :                 keyKind == InternalKeyKindSet &&
     571           1 :                 valueLen > tinyValueThreshold &&
     572           1 :                 w.valueBlock != nil
     573           1 :         if !useValueBlock {
     574           1 :                 return eval, nil
     575           1 :         }
     576             :         // NB: it is possible that eval.kcmp.UserKeyComparison == 0, i.e., these two
     577             :         // SETs have identical user keys (because of an open snapshot). This should
     578             :         // be the rare case.
     579             : 
     580             :         // If there are bounds requiring some keys' values to be in-place, compare
     581             :         // the prefix against the bounds.
     582           1 :         if !w.opts.RequiredInPlaceValueBound.IsEmpty() {
     583           1 :                 if w.comparer.Compare(w.opts.RequiredInPlaceValueBound.Upper, key.UserKey[:eval.kcmp.PrefixLen]) <= 0 {
     584           1 :                         // Common case for CockroachDB. Make it empty since all future keys
     585           1 :                         // will be >= this key.
     586           1 :                         w.opts.RequiredInPlaceValueBound = UserKeyPrefixBound{}
     587           1 :                 } else if w.comparer.Compare(key.UserKey[:eval.kcmp.PrefixLen], w.opts.RequiredInPlaceValueBound.Lower) >= 0 {
     588           1 :                         // Don't write to value block if the key is within the bounds.
     589           1 :                         return eval, nil
     590           1 :                 }
     591             :         }
     592           1 :         eval.writeToValueBlock = true
     593           1 :         return eval, nil
     594             : }
     595             : 
     596             : var compressedBlockPool = sync.Pool{
     597           1 :         New: func() interface{} {
     598           1 :                 return new(compressedBlock)
     599           1 :         },
     600             : }
     601             : 
     602             : type compressedBlock struct {
     603             :         physical block.PhysicalBlock
     604             :         blockBuf blockBuf
     605             : }
     606             : 
     607           1 : func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) error {
     608           1 :         serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
     609           1 :         w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
     610           1 :         // Compute the separator that will be written to the index block alongside
     611           1 :         // this data block's end offset. It is the separator between the last key in
     612           1 :         // the finished block and the [nextKey] that was excluded from the block.
     613           1 :         w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
     614           1 :         if err := w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf); err != nil {
     615           0 :                 return err
     616           0 :         }
     617           1 :         w.dataBlock.Reset()
     618           1 :         w.pendingDataBlockSize = 0
     619           1 :         return nil
     620             : }
     621             : 
     622             : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
     623             : // blocks if the number of deletions in the data block exceeds a threshold or
     624             : // the deletion size exceeds a threshold. It should be called after the
     625             : // data block has been finished.
     626             : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
     627           1 : func (w *RawColumnWriter) maybeIncrementTombstoneDenseBlocks(uncompressedLen int) {
     628           1 :         minSize := w.opts.DeletionSizeRatioThreshold * float32(uncompressedLen)
     629           1 :         if w.dataBlock.numDeletions > w.opts.NumDeletionsThreshold || float32(w.dataBlock.deletionSize) > minSize {
     630           1 :                 w.props.NumTombstoneDenseBlocks++
     631           1 :         }
     632           1 :         w.dataBlock.numDeletions = 0
     633           1 :         w.dataBlock.deletionSize = 0
     634             : }
     635             : 
     636             : // enqueueDataBlock compresses and checksums the provided data block and sends
     637             : // it to the write queue to be asynchronously written to the underlying storage.
     638             : // It also adds the block's index block separator to the pending index block,
     639             : // possibly triggering the index block to be finished and buffered.
     640             : func (w *RawColumnWriter) enqueueDataBlock(
     641             :         serializedBlock []byte, lastKey base.InternalKey, separator []byte,
     642           1 : ) error {
     643           1 :         w.lastKeyBuf = append(w.lastKeyBuf[:0], lastKey.UserKey...)
     644           1 :         w.meta.SetLargestPointKey(base.InternalKey{
     645           1 :                 UserKey: w.lastKeyBuf,
     646           1 :                 Trailer: lastKey.Trailer,
     647           1 :         })
     648           1 : 
     649           1 :         if invariants.Enabled {
     650           1 :                 var dec colblk.DataBlockDecoder
     651           1 :                 dec.Init(w.opts.KeySchema, serializedBlock)
     652           1 :                 if err := dec.Validate(w.comparer, w.opts.KeySchema); err != nil {
     653           0 :                         panic(err)
     654             :                 }
     655             :         }
     656             : 
     657             :         // Serialize the data block, compress it and send it to the write queue.
     658           1 :         cb := compressedBlockPool.Get().(*compressedBlock)
     659           1 :         cb.blockBuf.checksummer.Type = w.opts.Checksum
     660           1 :         cb.physical = block.CompressAndChecksum(
     661           1 :                 &cb.blockBuf.dataBuf,
     662           1 :                 serializedBlock,
     663           1 :                 w.opts.Compression,
     664           1 :                 &cb.blockBuf.checksummer,
     665           1 :         )
     666           1 :         if !cb.physical.IsCompressed() {
     667           1 :                 // If the block isn't compressed, cb.physical's underlying data points
     668           1 :                 // directly into a buffer owned by w.dataBlock. Clone it before passing
     669           1 :                 // it to the write queue to be asynchronously written to disk.
     670           1 :                 // TODO(jackson): Should we try to avoid this clone by tracking the
     671           1 :                 // lifetime of the DataBlockWriters?
     672           1 :                 cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
     673           1 :         }
     674           1 :         return w.enqueuePhysicalBlock(cb, separator)
     675             : }
     676             : 
     677           1 : func (w *RawColumnWriter) enqueuePhysicalBlock(cb *compressedBlock, separator []byte) error {
     678           1 :         dataBlockHandle := block.Handle{
     679           1 :                 Offset: w.queuedDataSize,
     680           1 :                 Length: uint64(cb.physical.LengthWithoutTrailer()),
     681           1 :         }
     682           1 :         w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
     683           1 :         w.writeQueue.ch <- cb
     684           1 : 
     685           1 :         var err error
     686           1 :         w.blockPropsEncoder.resetProps()
     687           1 :         for i := range w.blockPropCollectors {
     688           1 :                 scratch := w.blockPropsEncoder.getScratchForProp()
     689           1 :                 if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
     690           0 :                         return err
     691           0 :                 }
     692           1 :                 w.blockPropsEncoder.addProp(shortID(i), scratch)
     693             :         }
     694           1 :         dataBlockProps := w.blockPropsEncoder.unsafeProps()
     695           1 : 
     696           1 :         // Add the separator to the index block. This might trigger a flush of the
     697           1 :         // index block too.
     698           1 :         i := w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
     699           1 :         sizeWithEntry := w.indexBlock.Size()
     700           1 :         if shouldFlushWithoutLatestKV(sizeWithEntry, w.indexBlockSize, i, &w.indexFlush) {
     701           1 :                 // NB: finishIndexBlock will use blockPropsEncoder, so we must clone the
     702           1 :                 // data block's props first.
     703           1 :                 dataBlockProps = slices.Clone(dataBlockProps)
     704           1 : 
     705           1 :                 if err = w.finishIndexBlock(w.indexBlock.Rows() - 1); err != nil {
     706           0 :                         return err
     707           0 :                 }
     708             :                 // finishIndexBlock reset the index block builder, and we can
     709             :                 // add the block handle to this new index block.
     710           1 :                 _ = w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
     711           1 :                 w.indexBlockSize = w.indexBlock.Size()
     712           1 :         } else {
     713           1 :                 w.indexBlockSize = sizeWithEntry
     714           1 :         }
     715             :         // Incorporate the finished data block's property into the index block, now
     716             :         // that we've flushed the index block without the new separator if
     717             :         // necessary.
     718           1 :         for i := range w.blockPropCollectors {
     719           1 :                 w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
     720           1 :         }
     721           1 :         return nil
     722             : }
     723             : 
     724             : // finishIndexBlock finishes the currently pending index block with the first
     725             : // [rows] rows. In practice, [rows] is always w.indexBlock.Rows() or
     726             : // w.indexBlock.Rows()-1.
     727             : //
     728             : // The finished index block is buffered until the writer is closed.
     729           1 : func (w *RawColumnWriter) finishIndexBlock(rows int) error {
     730           1 :         defer w.indexBlock.Reset()
     731           1 :         w.blockPropsEncoder.resetProps()
     732           1 :         for i := range w.blockPropCollectors {
     733           1 :                 scratch := w.blockPropsEncoder.getScratchForProp()
     734           1 :                 var err error
     735           1 :                 if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
     736           0 :                         return err
     737           0 :                 }
     738           1 :                 w.blockPropsEncoder.addProp(shortID(i), scratch)
     739             :         }
     740           1 :         indexProps := w.blockPropsEncoder.props()
     741           1 :         bib := bufferedIndexBlock{nEntries: rows, properties: indexProps}
     742           1 : 
     743           1 :         // Copy the last (greatest) separator key in the index block into bib.sep.
     744           1 :         // It'll be the separator on the entry in the top-level index block.
     745           1 :         //
     746           1 :         // TODO(jackson): bib.sep.Trailer is unused within the columnar-block
     747           1 :         // sstable writer. Its existence is a code artifact of reuse of the
     748           1 :         // bufferedIndexBlock type between colblk and rowblk writers. This can be
     749           1 :         // cleaned up.
     750           1 :         bib.sep.Trailer = base.MakeTrailer(base.SeqNumMax, base.InternalKeyKindSeparator)
     751           1 :         w.indexBuffering.sepAlloc, bib.sep.UserKey = w.indexBuffering.sepAlloc.Copy(
     752           1 :                 w.indexBlock.UnsafeSeparator(rows - 1))
     753           1 : 
     754           1 :         // Finish the index block and copy it so that w.indexBlock may be reused.
     755           1 :         block := w.indexBlock.Finish(rows)
     756           1 :         if len(w.indexBuffering.blockAlloc) < len(block) {
     757           1 :                 // Allocate enough bytes for approximately 16 index blocks.
     758           1 :                 w.indexBuffering.blockAlloc = make([]byte, len(block)*16)
     759           1 :         }
     760           1 :         n := copy(w.indexBuffering.blockAlloc, block)
     761           1 :         bib.block = w.indexBuffering.blockAlloc[:n:n]
     762           1 :         w.indexBuffering.blockAlloc = w.indexBuffering.blockAlloc[n:]
     763           1 : 
     764           1 :         w.indexBuffering.partitions = append(w.indexBuffering.partitions, bib)
     765           1 :         return nil
     766             : }
     767             : 
     768             : // flushBufferedIndexBlocks writes all index blocks, including the top-level
     769             : // index block if necessary, to the underlying writable. It returns the block
     770             : // handle of the top index (either the only index block or the top-level index
     771             : // if two-level).
     772           1 : func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, err error) {
     773           1 :         // If there's a currently-pending index block, finish it.
     774           1 :         if w.indexBlock.Rows() > 0 || len(w.indexBuffering.partitions) == 0 {
     775           1 :                 w.finishIndexBlock(w.indexBlock.Rows())
     776           1 :         }
     777             :         // We've buffered all the index blocks. Typically there's just one index
     778             :         // block, in which case we're writing a "single-level" index. If we're
     779             :         // writing a large file or the index separators happen to be excessively
     780             :         // long, we may have several index blocks and need to construct a
     781             :         // "two-level" index structure.
     782           1 :         switch len(w.indexBuffering.partitions) {
     783           0 :         case 0:
     784           0 :                 // This is impossible because we'll flush the index block immediately
     785           0 :                 // above this switch statement if there are no buffered partitions
     786           0 :                 // (regardless of whether there are data block handles in the index
     787           0 :                 // block).
     788           0 :                 panic("unreachable")
     789           1 :         case 1:
     790           1 :                 // Single-level index.
     791           1 :                 rootIndex, err = w.layout.WriteIndexBlock(w.indexBuffering.partitions[0].block)
     792           1 :                 if err != nil {
     793           0 :                         return rootIndex, err
     794           0 :                 }
     795           1 :                 w.props.IndexSize = rootIndex.Length + block.TrailerLen
     796           1 :                 w.props.NumDataBlocks = uint64(w.indexBuffering.partitions[0].nEntries)
     797           1 :                 w.props.IndexType = binarySearchIndex
     798           1 :         default:
     799           1 :                 // Two-level index.
     800           1 :                 for _, part := range w.indexBuffering.partitions {
     801           1 :                         bh, err := w.layout.WriteIndexBlock(part.block)
     802           1 :                         if err != nil {
     803           0 :                                 return block.Handle{}, err
     804           0 :                         }
     805           1 :                         w.props.IndexSize += bh.Length + block.TrailerLen
     806           1 :                         w.props.NumDataBlocks += uint64(w.indexBuffering.partitions[0].nEntries)
     807           1 :                         w.topLevelIndexBlock.AddBlockHandle(part.sep.UserKey, bh, part.properties)
     808             :                 }
     809           1 :                 rootIndex, err = w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish(w.topLevelIndexBlock.Rows()))
     810           1 :                 if err != nil {
     811           0 :                         return block.Handle{}, err
     812           0 :                 }
     813           1 :                 w.props.IndexSize += rootIndex.Length + block.TrailerLen
     814           1 :                 w.props.IndexType = twoLevelIndex
     815           1 :                 w.props.IndexPartitions = uint64(len(w.indexBuffering.partitions))
     816             :         }
     817           1 :         return rootIndex, nil
     818             : }
     819             : 
     820             : // drainWriteQueue runs in its own goroutine and is responsible for writing
     821             : // finished, compressed data blocks to the writable. It reads from w.writeQueue
     822             : // until the channel is closed. All data blocks are written by this goroutine.
     823             : // Other blocks are written directly by the client goroutine. See Close.
     824           1 : func (w *RawColumnWriter) drainWriteQueue() {
     825           1 :         defer w.writeQueue.wg.Done()
     826           1 :         for cb := range w.writeQueue.ch {
     827           1 :                 if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
     828           0 :                         w.writeQueue.err = err
     829           0 :                 }
     830           1 :                 cb.blockBuf.clear()
     831           1 :                 cb.physical = block.PhysicalBlock{}
     832           1 :                 compressedBlockPool.Put(cb)
     833             :         }
     834             : }
     835             : 
     836           1 : func (w *RawColumnWriter) Close() (err error) {
     837           1 :         defer func() {
     838           1 :                 if w.valueBlock != nil {
     839           1 :                         w.valueBlock.Release()
     840           1 :                         // Defensive code in case Close gets called again. We don't want to put
     841           1 :                         // the same object to a sync.Pool.
     842           1 :                         w.valueBlock = nil
     843           1 :                 }
     844           1 :                 w.layout.Abort()
     845           1 :                 // Record any error in the writer (so we can exit early if Close is called
     846           1 :                 // again).
     847           1 :                 if err != nil {
     848           1 :                         w.err = err
     849           1 :                 }
     850             :         }()
     851           1 :         if w.layout.writable == nil {
     852           1 :                 return w.err
     853           1 :         }
     854             : 
     855             :         // Finish the last data block and send it to the write queue if it contains
     856             :         // any pending KVs.
     857           1 :         if rows := w.dataBlock.Rows(); rows > 0 {
     858           1 :                 serializedBlock, lastKey := w.dataBlock.Finish(rows, w.pendingDataBlockSize)
     859           1 :                 w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], lastKey.UserKey)
     860           1 :                 w.err = errors.CombineErrors(w.err, w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf))
     861           1 :                 w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
     862           1 :         }
     863             :         // Close the write queue channel so that the goroutine responsible for
     864             :         // writing data blocks to disk knows to exit. Any subsequent blocks (eg,
     865             :         // index, metadata, range key, etc) will be written by the goroutine that
     866             :         // called Close.
     867           1 :         close(w.writeQueue.ch)
     868           1 :         w.writeQueue.wg.Wait()
     869           1 :         // If the write queue encountered any errors while writing out data blocks,
     870           1 :         // it's stored in w.writeQueue.err.
     871           1 :         w.err = firstError(w.err, w.writeQueue.err)
     872           1 :         if w.err != nil {
     873           1 :                 return w.err
     874           1 :         }
     875             : 
     876             :         // INVARIANT: w.queuedDataSize == w.layout.offset.
     877             :         // All data blocks have been written to disk. The queuedDataSize is the
     878             :         // cumulative size of all the data blocks we've sent to the write queue. Now
     879             :         // that they've all been flushed, queuedDataSize should match w.layout's
     880             :         // offset.
     881           1 :         if w.queuedDataSize != w.layout.offset {
     882           0 :                 panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
     883           0 :                         w.queuedDataSize, w.layout.offset))
     884             :         }
     885           1 :         w.props.DataSize = w.layout.offset
     886           1 :         if _, err = w.flushBufferedIndexBlocks(); err != nil {
     887           0 :                 return err
     888           0 :         }
     889             : 
     890             :         // Write the filter block.
     891           1 :         if w.filterBlock != nil {
     892           1 :                 bh, err := w.layout.WriteFilterBlock(w.filterBlock)
     893           1 :                 if err != nil {
     894           0 :                         return err
     895           0 :                 }
     896           1 :                 w.props.FilterPolicyName = w.filterBlock.policyName()
     897           1 :                 w.props.FilterSize = bh.Length
     898             :         }
     899             : 
     900             :         // Write the range deletion block if non-empty.
     901           1 :         if w.rangeDelBlock.KeyCount() > 0 {
     902           1 :                 w.props.NumRangeDeletions = uint64(w.rangeDelBlock.KeyCount())
     903           1 :                 sm, la := w.rangeDelBlock.UnsafeBoundaryKeys()
     904           1 :                 w.meta.SetSmallestRangeDelKey(sm)
     905           1 :                 w.meta.SetLargestRangeDelKey(la)
     906           1 :                 if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
     907           0 :                         return err
     908           0 :                 }
     909             :         }
     910             : 
     911             :         // Write the range key block if non-empty.
     912           1 :         if w.rangeKeyBlock.KeyCount() > 0 {
     913           1 :                 sm, la := w.rangeKeyBlock.UnsafeBoundaryKeys()
     914           1 :                 w.meta.SetSmallestRangeKey(sm)
     915           1 :                 w.meta.SetLargestRangeKey(la)
     916           1 :                 if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
     917           0 :                         return err
     918           0 :                 }
     919             :         }
     920             : 
     921             :         // Write out the value block.
     922           1 :         if w.valueBlock != nil {
     923           1 :                 _, vbStats, err := w.valueBlock.Finish(&w.layout, w.layout.offset)
     924           1 :                 if err != nil {
     925           0 :                         return err
     926           0 :                 }
     927           1 :                 w.props.NumValueBlocks = vbStats.NumValueBlocks
     928           1 :                 w.props.NumValuesInValueBlocks = vbStats.NumValuesInValueBlocks
     929           1 :                 w.props.ValueBlocksSize = vbStats.ValueBlocksAndIndexSize
     930             :         }
     931             : 
     932             :         // Write the properties block.
     933           1 :         {
     934           1 :                 // Finish and record the prop collectors if props are not yet recorded.
     935           1 :                 // Pre-computed props might have been copied by specialized sst creators
     936           1 :                 // like suffix replacer.
     937           1 :                 if len(w.props.UserProperties) == 0 {
     938           1 :                         userProps := make(map[string]string)
     939           1 :                         for i := range w.blockPropCollectors {
     940           1 :                                 scratch := w.blockPropsEncoder.getScratchForProp()
     941           1 :                                 // Place the shortID in the first byte.
     942           1 :                                 scratch = append(scratch, byte(i))
     943           1 :                                 buf, err := w.blockPropCollectors[i].FinishTable(scratch)
     944           1 :                                 if err != nil {
     945           0 :                                         return err
     946           0 :                                 }
     947           1 :                                 var prop string
     948           1 :                                 if len(buf) > 0 {
     949           1 :                                         prop = string(buf)
     950           1 :                                 }
     951             :                                 // NB: The property is populated in the map even if it is the
     952             :                                 // empty string, since the presence in the map is what indicates
     953             :                                 // that the block property collector was used when writing.
     954           1 :                                 userProps[w.blockPropCollectors[i].Name()] = prop
     955             :                         }
     956           1 :                         if len(userProps) > 0 {
     957           1 :                                 w.props.UserProperties = userProps
     958           1 :                         }
     959             :                 }
     960             : 
     961           1 :                 var raw rowblk.Writer
     962           1 :                 // The restart interval is set to infinity because the properties block
     963           1 :                 // is always read sequentially and cached in a heap located object. This
     964           1 :                 // reduces table size without a significant impact on performance.
     965           1 :                 raw.RestartInterval = propertiesBlockRestartInterval
     966           1 :                 w.props.CompressionOptions = rocksDBCompressionOptions
     967           1 :                 w.props.save(w.opts.TableFormat, &raw)
     968           1 :                 if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
     969           0 :                         return err
     970           0 :                 }
     971             :         }
     972             : 
     973             :         // Write the table footer.
     974           1 :         w.meta.Size, err = w.layout.Finish()
     975           1 :         if err != nil {
     976           0 :                 return err
     977           0 :         }
     978           1 :         w.meta.Properties = w.props
     979           1 :         // Release any held memory and make any future calls error.
     980           1 :         *w = RawColumnWriter{meta: w.meta, err: errWriterClosed}
     981           1 :         return nil
     982             : }
     983             : 
     984             : // rewriteSuffixes implements RawWriter.
     985             : func (w *RawColumnWriter) rewriteSuffixes(
     986             :         r *Reader, sstBytes []byte, wo WriterOptions, from, to []byte, concurrency int,
     987           1 : ) error {
     988           1 :         for _, c := range w.blockPropCollectors {
     989           1 :                 if !c.SupportsSuffixReplacement() {
     990           0 :                         return errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
     991           0 :                 }
     992             :         }
     993           1 :         l, err := r.Layout()
     994           1 :         if err != nil {
     995           0 :                 return errors.Wrap(err, "reading layout")
     996           0 :         }
     997             :         // Copy data blocks in parallel, rewriting suffixes as we go.
     998           1 :         blocks, err := rewriteDataBlocksInParallel(r, sstBytes, wo, l.Data, from, to, concurrency, func() blockRewriter {
     999           1 :                 return colblk.NewDataBlockRewriter(wo.KeySchema, w.comparer)
    1000           1 :         })
    1001           1 :         if err != nil {
    1002           1 :                 return errors.Wrap(err, "rewriting data blocks")
    1003           1 :         }
    1004             : 
    1005             :         // oldShortIDs maps the shortID for the block property collector in the old
    1006             :         // blocks to the shortID in the new blocks. Initialized once for the sstable.
    1007           1 :         oldShortIDs, n, err := getShortIDs(r, w.blockPropCollectors)
    1008           1 :         if err != nil {
    1009           0 :                 return errors.Wrap(err, "getting short IDs")
    1010           0 :         }
    1011           1 :         oldProps := make([][]byte, len(w.blockPropCollectors))
    1012           1 :         for i := range blocks {
    1013           1 :                 cb := compressedBlockPool.Get().(*compressedBlock)
    1014           1 :                 cb.physical = blocks[i].physical
    1015           1 : 
    1016           1 :                 // Load any previous values for our prop collectors into oldProps.
    1017           1 :                 for i := range oldProps {
    1018           1 :                         oldProps[i] = nil
    1019           1 :                 }
    1020           1 :                 decoder := makeBlockPropertiesDecoder(n, l.Data[i].Props)
    1021           1 :                 for !decoder.Done() {
    1022           1 :                         id, val, err := decoder.Next()
    1023           1 :                         if err != nil {
    1024           0 :                                 return err
    1025           0 :                         }
    1026           1 :                         if oldShortIDs[id].IsValid() {
    1027           1 :                                 oldProps[oldShortIDs[id]] = val
    1028           1 :                         }
    1029             :                 }
    1030           1 :                 for i, p := range w.blockPropCollectors {
    1031           1 :                         if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil {
    1032           0 :                                 return err
    1033           0 :                         }
    1034             :                 }
    1035           1 :                 var separator []byte
    1036           1 :                 if i+1 < len(blocks) {
    1037           1 :                         w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], blocks[i].end.UserKey, blocks[i+1].start.UserKey)
    1038           1 :                         separator = w.separatorBuf
    1039           1 :                 } else {
    1040           1 :                         w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], blocks[i].end.UserKey)
    1041           1 :                         separator = w.separatorBuf
    1042           1 :                 }
    1043           1 :                 w.enqueuePhysicalBlock(cb, separator)
    1044             :         }
    1045             : 
    1046           1 :         if len(blocks) > 0 {
    1047           1 :                 w.meta.updateSeqNum(blocks[0].start.SeqNum())
    1048           1 :                 w.props.NumEntries = r.Properties.NumEntries
    1049           1 :                 w.props.RawKeySize = r.Properties.RawKeySize
    1050           1 :                 w.props.RawValueSize = r.Properties.RawValueSize
    1051           1 :                 w.meta.SetSmallestPointKey(blocks[0].start)
    1052           1 :                 w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
    1053           1 :         }
    1054             : 
    1055             :         // Copy range key block, replacing suffixes if it exists.
    1056           1 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
    1057           0 :                 return errors.Wrap(err, "rewriting range key blocks")
    1058           0 :         }
    1059             :         // Copy over the filter block if it exists.
    1060           1 :         if w.filterBlock != nil {
    1061           1 :                 if filterBlockBH, ok := l.FilterByName(w.filterBlock.metaName()); ok {
    1062           1 :                         filterBlock, _, err := readBlockBuf(sstBytes, filterBlockBH, r.checksumType, nil)
    1063           1 :                         if err != nil {
    1064           0 :                                 return errors.Wrap(err, "reading filter")
    1065           0 :                         }
    1066           1 :                         w.filterBlock = copyFilterWriter{
    1067           1 :                                 origPolicyName: w.filterBlock.policyName(),
    1068           1 :                                 origMetaName:   w.filterBlock.metaName(),
    1069           1 :                                 data:           filterBlock,
    1070           1 :                         }
    1071             :                 }
    1072             :         }
    1073           1 :         return nil
    1074             : }
    1075             : 
    1076             : func shouldFlushWithoutLatestKV(
    1077             :         sizeWithKV int, sizeWithoutKV int, entryCountWithoutKV int, flushGovernor *block.FlushGovernor,
    1078           1 : ) bool {
    1079           1 :         if entryCountWithoutKV == 0 {
    1080           1 :                 return false
    1081           1 :         }
    1082           1 :         if sizeWithoutKV < flushGovernor.LowWatermark() {
    1083           1 :                 // Fast path when the block is too small to flush.
    1084           1 :                 return false
    1085           1 :         }
    1086           1 :         return flushGovernor.ShouldFlush(sizeWithoutKV, sizeWithKV)
    1087             : }
    1088             : 
    1089             : // copyDataBlocks adds a range of blocks to the table as-is. These blocks could be
    1090             : // compressed. It's specifically used by the sstable copier that can copy parts
    1091             : // of an sstable to a new sstable, using CopySpan().
    1092             : func (w *RawColumnWriter) copyDataBlocks(
    1093             :         ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
    1094           1 : ) error {
    1095           1 :         const readSizeTarget = 256 << 10
    1096           1 :         readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
    1097           1 :                 if firstBlockIdx > lastBlockIdx {
    1098           0 :                         panic("pebble: readAndFlushBlocks called with invalid block range")
    1099             :                 }
    1100             :                 // We need to flush blocks[firstBlockIdx:lastBlockIdx+1] into the write queue.
    1101             :                 // We do this by issuing one big read from the read handle into the buffer, and
    1102             :                 // then enqueueing the writing of those blocks one-by-one.
    1103             :                 //
    1104             :                 // TODO(bilal): Consider refactoring the write queue to support writing multiple
    1105             :                 // blocks in one request.
    1106           1 :                 lastBH := blocks[lastBlockIdx].bh
    1107           1 :                 blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
    1108           1 :                 // We need to create a new buffer for each read, as w.enqueuePhysicalBlock passes
    1109           1 :                 // a pointer to the buffer to the write queue.
    1110           1 :                 buf := make([]byte, 0, blocksToReadLen)
    1111           1 :                 if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
    1112           0 :                         return err
    1113           0 :                 }
    1114           1 :                 for i := firstBlockIdx; i <= lastBlockIdx; i++ {
    1115           1 :                         offsetDiff := blocks[i].bh.Offset - blocks[firstBlockIdx].bh.Offset
    1116           1 :                         blockBuf := buf[offsetDiff : offsetDiff+blocks[i].bh.Length+block.TrailerLen]
    1117           1 :                         cb := compressedBlockPool.Get().(*compressedBlock)
    1118           1 :                         cb.physical = block.NewPhysicalBlock(blockBuf)
    1119           1 :                         if err := w.enqueuePhysicalBlock(cb, blocks[i].sep); err != nil {
    1120           0 :                                 return err
    1121           0 :                         }
    1122             :                 }
    1123           1 :                 return nil
    1124             :         }
    1125             :         // Iterate through blocks until we have enough to fill readSizeTarget. When we have more than
    1126             :         // one block in blocksToRead and adding the next block would exceed the target buffer capacity,
    1127             :         // we read and flush existing blocks in blocksToRead. This allows us to read as many
    1128             :         // blocks in one IO request as possible, while still utilizing the write queue in this
    1129             :         // writer.
    1130           1 :         lastBlockOffset := uint64(0)
    1131           1 :         for i := 0; i < len(blocks); {
    1132           1 :                 if blocks[i].bh.Offset < lastBlockOffset {
    1133           0 :                         panic("pebble: copyDataBlocks called with blocks out of order")
    1134             :                 }
    1135           1 :                 start := i
    1136           1 :                 // Note the i++ in the initializing condition; this means we will always flush at least
    1137           1 :                 // one block.
    1138           1 :                 for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(readSizeTarget); i++ {
    1139           1 :                 }
    1140             :                 // i points to one index past the last block we want to read.
    1141           1 :                 if err := readAndFlushBlocks(start, i-1); err != nil {
    1142           0 :                         return err
    1143           0 :                 }
    1144             :         }
    1145           1 :         return nil
    1146             : }
    1147             : 
    1148             : // addDataBlock adds a raw uncompressed data block to the table as-is. It's specifically used
    1149             : // by the sstable copier that can copy parts of an sstable to a new sstable,
    1150             : // using CopySpan().
    1151           1 : func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
    1152           1 :         // Serialize the data block, compress it and send it to the write queue.
    1153           1 :         cb := compressedBlockPool.Get().(*compressedBlock)
    1154           1 :         cb.blockBuf.checksummer.Type = w.opts.Checksum
    1155           1 :         cb.physical = block.CompressAndChecksum(
    1156           1 :                 &cb.blockBuf.dataBuf,
    1157           1 :                 b,
    1158           1 :                 w.opts.Compression,
    1159           1 :                 &cb.blockBuf.checksummer,
    1160           1 :         )
    1161           1 :         if !cb.physical.IsCompressed() {
    1162           1 :                 // If the block isn't compressed, cb.physical's underlying data points
    1163           1 :                 // directly into a buffer owned by w.dataBlock. Clone it before passing
    1164           1 :                 // it to the write queue to be asynchronously written to disk.
    1165           1 :                 // TODO(jackson): Should we try to avoid this clone by tracking the
    1166           1 :                 // lifetime of the DataBlockWriters?
    1167           1 :                 cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
    1168           1 :         }
    1169           1 :         if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
    1170           0 :                 return err
    1171           0 :         }
    1172           1 :         return nil
    1173             : }
    1174             : 
    1175             : // copyFilter copies the specified filter to the table. It's specifically used
    1176             : // by the sstable copier that can copy parts of an sstable to a new sstable,
    1177             : // using CopySpan().
    1178           0 : func (w *RawColumnWriter) copyFilter(filter []byte, filterName string) error {
    1179           0 :         if w.filterBlock != nil && filterName != w.filterBlock.policyName() {
    1180           0 :                 return errors.New("mismatched filters")
    1181           0 :         }
    1182           0 :         w.filterBlock = copyFilterWriter{
    1183           0 :                 origPolicyName: w.filterBlock.policyName(), origMetaName: w.filterBlock.metaName(), data: filter,
    1184           0 :         }
    1185           0 :         return nil
    1186             : }
    1187             : 
    1188             : // copyProperties copies properties from the specified props, and resets others
    1189             : // to prepare for copying data blocks from another sstable, using the copy/addDataBlock(s)
    1190             : // methods above. It's specifically used by the sstable copier that can copy parts of an
    1191             : // sstable to a new sstable, using CopySpan().
    1192           1 : func (w *RawColumnWriter) copyProperties(props Properties) {
    1193           1 :         w.props = props
    1194           1 :         // Remove all user properties to disable block properties, which we do not
    1195           1 :         // calculate for CopySpan.
    1196           1 :         w.props.UserProperties = nil
    1197           1 :         // Reset props that we'll re-derive as we build our own index.
    1198           1 :         w.props.IndexPartitions = 0
    1199           1 :         w.props.TopLevelIndexSize = 0
    1200           1 :         w.props.IndexSize = 0
    1201           1 :         w.props.IndexType = 0
    1202           1 : }

Generated by: LCOV version 1.14