LCOV - code coverage report
Current view: top level - pebble/sstable - colblk_writer.go (source / functions) Hit Total Coverage
Test: 2024-10-30 08:17Z 405b4e12 - tests only.lcov Lines: 698 804 86.8 %
Date: 2024-10-30 08:18:59 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "math"
      13             :         "slices"
      14             :         "sync"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      19             :         "github.com/cockroachdb/pebble/internal/invariants"
      20             :         "github.com/cockroachdb/pebble/internal/keyspan"
      21             :         "github.com/cockroachdb/pebble/objstorage"
      22             :         "github.com/cockroachdb/pebble/sstable/block"
      23             :         "github.com/cockroachdb/pebble/sstable/colblk"
      24             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      25             : )
      26             : 
      27             : // RawColumnWriter is a sstable RawWriter that writes sstables with
      28             : // column-oriented blocks. All table formats TableFormatPebblev5 and later write
      29             : // column-oriented blocks and use RawColumnWriter.
      30             : type RawColumnWriter struct {
      31             :         comparer *base.Comparer
      32             :         meta     WriterMetadata
      33             :         opts     WriterOptions
      34             :         err      error
      35             : 
      36             :         dataFlush           block.FlushGovernor
      37             :         indexFlush          block.FlushGovernor
      38             :         blockPropCollectors []BlockPropertyCollector
      39             :         blockPropsEncoder   blockPropertiesEncoder
      40             :         obsoleteCollector   obsoleteKeyBlockPropertyCollector
      41             :         props               Properties
      42             :         // block writers buffering unflushed data.
      43             :         dataBlock struct {
      44             :                 colblk.DataBlockEncoder
      45             :                 // numDeletions stores the count of point tombstones in this data block.
      46             :                 // It's used to determine if this data block is considered
      47             :                 // tombstone-dense for the purposes of compaction.
      48             :                 numDeletions int
      49             :                 // deletionSize stores the raw size of point tombstones in this data
      50             :                 // block. It's used to determine if this data block is considered
      51             :                 // tombstone-dense for the purposes of compaction.
      52             :                 deletionSize int
      53             :         }
      54             :         indexBlock         colblk.IndexBlockWriter
      55             :         topLevelIndexBlock colblk.IndexBlockWriter
      56             :         rangeDelBlock      colblk.KeyspanBlockWriter
      57             :         rangeKeyBlock      colblk.KeyspanBlockWriter
      58             :         valueBlock         *valueBlockWriter // nil iff WriterOptions.DisableValueBlocks=true
      59             :         // filter accumulates the filter block. If populated, the filter ingests
      60             :         // either the output of w.split (i.e. a prefix extractor) if w.split is not
      61             :         // nil, or the full keys otherwise.
      62             :         filterBlock  filterWriter
      63             :         prevPointKey struct {
      64             :                 trailer    base.InternalKeyTrailer
      65             :                 isObsolete bool
      66             :         }
      67             :         pendingDataBlockSize int
      68             :         indexBlockSize       int
      69             :         queuedDataSize       uint64
      70             : 
      71             :         // indexBuffering holds finished index blocks as they're completed while
      72             :         // building the sstable. If an index block grows sufficiently large
      73             :         // (IndexBlockSize) while an sstable is still being constructed, the sstable
      74             :         // writer will create a two-level index structure. As index blocks are
      75             :         // completed, they're finished and buffered in-memory until the table is
      76             :         // finished. When the table is finished, the buffered index blocks are
      77             :         // flushed in order after all the data blocks, and the top-level index block
      78             :         // is constructed to point to all the individual index blocks.
      79             :         indexBuffering struct {
      80             :                 // partitions holds all the completed index blocks.
      81             :                 partitions []bufferedIndexBlock
      82             :                 // blockAlloc is used to bulk-allocate byte slices used to store index
      83             :                 // blocks in partitions. These live until the sstable is finished.
      84             :                 blockAlloc []byte
      85             :                 // sepAlloc is used to bulk-allocate index block separator slices stored
      86             :                 // in partitions. These live until the sstable is finished.
      87             :                 sepAlloc bytealloc.A
      88             :         }
      89             : 
      90             :         writeQueue struct {
      91             :                 wg  sync.WaitGroup
      92             :                 ch  chan *compressedBlock
      93             :                 err error
      94             :         }
      95             :         layout layoutWriter
      96             : 
      97             :         separatorBuf          []byte
      98             :         tmp                   [blockHandleLikelyMaxLen]byte
      99             :         previousUserKey       invariants.Value[[]byte]
     100             :         disableKeyOrderChecks bool
     101             : }
     102             : 
     103             : // Assert that *RawColumnWriter implements RawWriter.
     104             : var _ RawWriter = (*RawColumnWriter)(nil)
     105             : 
     106           1 : func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
     107           1 :         if writable == nil {
     108           0 :                 panic("pebble: nil writable")
     109             :         }
     110           1 :         if !o.TableFormat.BlockColumnar() {
     111           0 :                 panic(errors.AssertionFailedf("newColumnarWriter cannot create sstables with %s format", o.TableFormat))
     112             :         }
     113           1 :         o = o.ensureDefaults()
     114           1 :         w := &RawColumnWriter{
     115           1 :                 comparer: o.Comparer,
     116           1 :                 meta: WriterMetadata{
     117           1 :                         SmallestSeqNum: math.MaxUint64,
     118           1 :                 },
     119           1 :                 opts:                  o,
     120           1 :                 layout:                makeLayoutWriter(writable, o),
     121           1 :                 disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
     122           1 :         }
     123           1 :         w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
     124           1 :         w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
     125           1 :         w.dataBlock.Init(o.KeySchema)
     126           1 :         w.indexBlock.Init()
     127           1 :         w.topLevelIndexBlock.Init()
     128           1 :         w.rangeDelBlock.Init(w.comparer.Equal)
     129           1 :         w.rangeKeyBlock.Init(w.comparer.Equal)
     130           1 :         if !o.DisableValueBlocks {
     131           1 :                 w.valueBlock = newValueBlockWriter(
     132           1 :                         block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
     133           1 :                         w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
     134             :         }
     135           1 :         if o.FilterPolicy != nil {
     136           1 :                 switch o.FilterType {
     137           1 :                 case TableFilter:
     138           1 :                         w.filterBlock = newTableFilterWriter(o.FilterPolicy)
     139           0 :                 default:
     140           0 :                         panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
     141             :                 }
     142             :         }
     143             : 
     144           1 :         numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
     145           1 :         if !o.disableObsoleteCollector {
     146           1 :                 numBlockPropertyCollectors++
     147           1 :         }
     148           1 :         if numBlockPropertyCollectors > maxPropertyCollectors {
     149           0 :                 panic(errors.New("pebble: too many block property collectors"))
     150             :         }
     151           1 :         w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
     152           1 :         for _, constructFn := range o.BlockPropertyCollectors {
     153           1 :                 w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
     154           1 :         }
     155           1 :         if !o.disableObsoleteCollector {
     156           1 :                 w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
     157           1 :         }
     158           1 :         var buf bytes.Buffer
     159           1 :         buf.WriteString("[")
     160           1 :         for i := range w.blockPropCollectors {
     161           1 :                 if i > 0 {
     162           1 :                         buf.WriteString(",")
     163           1 :                 }
     164           1 :                 buf.WriteString(w.blockPropCollectors[i].Name())
     165             :         }
     166           1 :         buf.WriteString("]")
     167           1 :         w.props.PropertyCollectorNames = buf.String()
     168           1 : 
     169           1 :         w.props.ComparerName = o.Comparer.Name
     170           1 :         w.props.CompressionName = o.Compression.String()
     171           1 :         w.props.KeySchemaName = o.KeySchema.Name
     172           1 :         w.props.MergerName = o.MergerName
     173           1 : 
     174           1 :         w.writeQueue.ch = make(chan *compressedBlock)
     175           1 :         w.writeQueue.wg.Add(1)
     176           1 :         go w.drainWriteQueue()
     177           1 :         return w
     178             : }
     179             : 
     180             : // Error returns the current accumulated error if any.
     181           1 : func (w *RawColumnWriter) Error() error {
     182           1 :         return w.err
     183           1 : }
     184             : 
     185             : // EstimatedSize returns the estimated size of the sstable being written if
     186             : // a call to Close() was made without adding additional keys.
     187           1 : func (w *RawColumnWriter) EstimatedSize() uint64 {
     188           1 :         sz := rocksDBFooterLen + w.queuedDataSize
     189           1 :         // TODO(jackson): Avoid iterating over partitions by incrementally
     190           1 :         // maintaining the size contribution of all buffered partitions.
     191           1 :         for _, bib := range w.indexBuffering.partitions {
     192           0 :                 // We include the separator user key to account for its bytes in the
     193           0 :                 // top-level index block.
     194           0 :                 //
     195           0 :                 // TODO(jackson): We could incrementally build the top-level index block
     196           0 :                 // and produce an exact calculation of the current top-level index
     197           0 :                 // block's size.
     198           0 :                 sz += uint64(len(bib.block) + block.TrailerLen + len(bib.sep.UserKey))
     199           0 :         }
     200           1 :         if w.rangeDelBlock.KeyCount() > 0 {
     201           1 :                 sz += uint64(w.rangeDelBlock.Size())
     202           1 :         }
     203           1 :         if w.rangeKeyBlock.KeyCount() > 0 {
     204           1 :                 sz += uint64(w.rangeKeyBlock.Size())
     205           1 :         }
     206           1 :         for _, blk := range w.valueBlock.blocks {
     207           0 :                 sz += uint64(blk.block.LengthWithTrailer())
     208           0 :         }
     209           1 :         if w.valueBlock.buf != nil {
     210           1 :                 sz += uint64(len(w.valueBlock.buf.b))
     211           1 :         }
     212             :         // TODO(jackson): Include an estimate of the properties, filter and meta
     213             :         // index blocks sizes.
     214           1 :         return sz
     215             : }
     216             : 
     217             : // ComparePrev compares the provided user to the last point key written to the
     218             : // writer. The returned value is equivalent to Compare(key, prevKey) where
     219             : // prevKey is the last point key written to the writer.
     220             : //
     221             : // If no key has been written yet, ComparePrev returns +1.
     222             : //
     223             : // Must not be called after Writer is closed.
     224           1 : func (w *RawColumnWriter) ComparePrev(k []byte) int {
     225           1 :         if w == nil || w.dataBlock.Rows() == 0 {
     226           1 :                 return +1
     227           1 :         }
     228           1 :         return int(w.dataBlock.KeyWriter.ComparePrev(k).UserKeyComparison)
     229             : }
     230             : 
     231             : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
     232             : // be used internally by Pebble.
     233             : func (w *RawColumnWriter) SetSnapshotPinnedProperties(
     234             :         pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
     235           1 : ) {
     236           1 :         w.props.SnapshotPinnedKeys = pinnedKeyCount
     237           1 :         w.props.SnapshotPinnedKeySize = pinnedKeySize
     238           1 :         w.props.SnapshotPinnedValueSize = pinnedValueSize
     239           1 : }
     240             : 
     241             : // Metadata returns the metadata for the finished sstable. Only valid to call
     242             : // after the sstable has been finished.
     243           1 : func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
     244           1 :         if !w.layout.IsFinished() {
     245           0 :                 return nil, errors.New("pebble: writer is not closed")
     246           0 :         }
     247           1 :         return &w.meta, nil
     248             : }
     249             : 
     250             : // EncodeSpan encodes the keys in the given span. The span can contain either
     251             : // only RANGEDEL keys or only range keys.
     252           1 : func (w *RawColumnWriter) EncodeSpan(span keyspan.Span) error {
     253           1 :         if span.Empty() {
     254           1 :                 return nil
     255           1 :         }
     256           1 :         for _, k := range span.Keys {
     257           1 :                 w.meta.updateSeqNum(k.SeqNum())
     258           1 :         }
     259             : 
     260           1 :         blockWriter := &w.rangeKeyBlock
     261           1 :         if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
     262           1 :                 blockWriter = &w.rangeDelBlock
     263           1 :                 // Update range delete properties.
     264           1 :                 // NB: These properties are computed differently than the rowblk sstable
     265           1 :                 // writer because this writer does not flatten them into row key-value
     266           1 :                 // pairs.
     267           1 :                 w.props.RawKeySize += uint64(len(span.Start) + len(span.End))
     268           1 :                 count := uint64(len(span.Keys))
     269           1 :                 w.props.NumEntries += count
     270           1 :                 w.props.NumDeletions += count
     271           1 :                 w.props.NumRangeDeletions += count
     272           1 :         } else {
     273           1 :                 // Update range key properties.
     274           1 :                 // NB: These properties are computed differently than the rowblk sstable
     275           1 :                 // writer because this writer does not flatten them into row key-value
     276           1 :                 // pairs.
     277           1 :                 w.props.RawRangeKeyKeySize += uint64(len(span.Start) + len(span.End))
     278           1 :                 for _, k := range span.Keys {
     279           1 :                         w.props.RawRangeKeyValueSize += uint64(len(k.Value))
     280           1 :                         switch k.Kind() {
     281           1 :                         case base.InternalKeyKindRangeKeyDelete:
     282           1 :                                 w.props.NumRangeKeyDels++
     283           1 :                         case base.InternalKeyKindRangeKeySet:
     284           1 :                                 w.props.NumRangeKeySets++
     285           1 :                         case base.InternalKeyKindRangeKeyUnset:
     286           1 :                                 w.props.NumRangeKeyUnsets++
     287           0 :                         default:
     288           0 :                                 panic(errors.Errorf("pebble: invalid range key type: %s", k.Kind()))
     289             :                         }
     290             :                 }
     291           1 :                 for i := range w.blockPropCollectors {
     292           1 :                         if err := w.blockPropCollectors[i].AddRangeKeys(span); err != nil {
     293           0 :                                 return err
     294           0 :                         }
     295             :                 }
     296             :         }
     297           1 :         if !w.disableKeyOrderChecks && blockWriter.KeyCount() > 0 {
     298           1 :                 // Check that spans are being added in fragmented order. If the two
     299           1 :                 // tombstones overlap, their start and end keys must be identical.
     300           1 :                 prevStart, prevEnd, prevTrailer := blockWriter.UnsafeLastSpan()
     301           1 :                 if w.opts.Comparer.Equal(prevStart, span.Start) && w.opts.Comparer.Equal(prevEnd, span.End) {
     302           1 :                         if prevTrailer < span.Keys[0].Trailer {
     303           1 :                                 w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
     304           1 :                                         w.opts.Comparer.FormatKey(prevStart),
     305           1 :                                         w.opts.Comparer.FormatKey(prevEnd),
     306           1 :                                         prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
     307           1 :                         }
     308           1 :                 } else if c := w.opts.Comparer.Compare(prevEnd, span.Start); c > 0 {
     309           1 :                         w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
     310           1 :                                 w.opts.Comparer.FormatKey(prevStart),
     311           1 :                                 w.opts.Comparer.FormatKey(prevEnd),
     312           1 :                                 prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
     313           1 :                         return w.err
     314           1 :                 }
     315             :         }
     316           1 :         blockWriter.AddSpan(span)
     317           1 :         return nil
     318             : }
     319             : 
     320             : // AddWithForceObsolete adds a point key/value pair when writing a
     321             : // strict-obsolete sstable. For a given Writer, the keys passed to Add must be
     322             : // in increasing order. Span keys (range deletions, range keys) must be added
     323             : // through EncodeSpan.
     324             : //
     325             : // forceObsolete indicates whether the caller has determined that this key is
     326             : // obsolete even though it may be the latest point key for this userkey. This
     327             : // should be set to true for keys obsoleted by RANGEDELs, and is required for
     328             : // strict-obsolete sstables.
     329             : //
     330             : // Note that there are two properties, S1 and S2 (see comment in format.go)
     331             : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
     332             : // responsibility of the caller. S1 is solely the responsibility of the
     333             : // callee.
     334             : func (w *RawColumnWriter) AddWithForceObsolete(
     335             :         key InternalKey, value []byte, forceObsolete bool,
     336           1 : ) error {
     337           1 :         switch key.Kind() {
     338             :         case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
     339           1 :                 base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
     340           1 :                 return errors.Newf("%s must be added through EncodeSpan", key.Kind())
     341           1 :         case base.InternalKeyKindMerge:
     342           1 :                 if w.opts.IsStrictObsolete {
     343           0 :                         return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
     344           0 :                 }
     345             :         }
     346             : 
     347           1 :         eval, err := w.evaluatePoint(key, len(value))
     348           1 :         if err != nil {
     349           1 :                 return err
     350           1 :         }
     351           1 :         eval.isObsolete = eval.isObsolete || forceObsolete
     352           1 :         w.prevPointKey.trailer = key.Trailer
     353           1 :         w.prevPointKey.isObsolete = eval.isObsolete
     354           1 : 
     355           1 :         var valuePrefix block.ValuePrefix
     356           1 :         var valueStoredWithKey []byte
     357           1 :         if eval.writeToValueBlock {
     358           1 :                 vh, err := w.valueBlock.addValue(value)
     359           1 :                 if err != nil {
     360           0 :                         return err
     361           0 :                 }
     362           1 :                 n := encodeValueHandle(w.tmp[:], vh)
     363           1 :                 valueStoredWithKey = w.tmp[:n]
     364           1 :                 var attribute base.ShortAttribute
     365           1 :                 if w.opts.ShortAttributeExtractor != nil {
     366           1 :                         // TODO(sumeer): for compactions, it is possible that the input sstable
     367           1 :                         // already has this value in the value section and so we have already
     368           1 :                         // extracted the ShortAttribute. Avoid extracting it again. This will
     369           1 :                         // require changing the RawWriter.Add interface.
     370           1 :                         if attribute, err = w.opts.ShortAttributeExtractor(
     371           1 :                                 key.UserKey, int(eval.kcmp.PrefixLen), value); err != nil {
     372           0 :                                 return err
     373           0 :                         }
     374             :                 }
     375           1 :                 valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
     376           1 :         } else {
     377           1 :                 valueStoredWithKey = value
     378           1 :                 if len(value) > 0 {
     379           1 :                         valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
     380           1 :                 }
     381             :         }
     382             : 
     383             :         // Append the key to the data block. We have NOT yet committed to
     384             :         // including the key in the block. The data block writer permits us to
     385             :         // finish the block excluding the last-appended KV.
     386           1 :         entriesWithoutKV := w.dataBlock.Rows()
     387           1 :         w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
     388           1 : 
     389           1 :         // Now that we've appended the KV pair, we can compute the exact size of the
     390           1 :         // block with this key-value pair included. Check to see if we should flush
     391           1 :         // the current block, either with or without the added key-value pair.
     392           1 :         size := w.dataBlock.Size()
     393           1 :         if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize, entriesWithoutKV, &w.dataFlush) {
     394           1 :                 // Flush the data block excluding the key we just added.
     395           1 :                 w.flushDataBlockWithoutNextKey(key.UserKey)
     396           1 :                 // flushDataBlockWithoutNextKey reset the data block builder, and we can
     397           1 :                 // add the key to this next block now.
     398           1 :                 w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
     399           1 :                 w.pendingDataBlockSize = w.dataBlock.Size()
     400           1 :         } else {
     401           1 :                 // We're not flushing the data block, and we're committing to including
     402           1 :                 // the current KV in the block. Remember the new size of the data block
     403           1 :                 // with the current KV.
     404           1 :                 w.pendingDataBlockSize = size
     405           1 :         }
     406             : 
     407           1 :         for i := range w.blockPropCollectors {
     408           1 :                 v := value
     409           1 :                 if key.Kind() == base.InternalKeyKindSet {
     410           1 :                         // Values for SET are not required to be in-place, and in the future
     411           1 :                         // may not even be read by the compaction, so pass nil values. Block
     412           1 :                         // property collectors in such Pebble DB's must not look at the
     413           1 :                         // value.
     414           1 :                         v = nil
     415           1 :                 }
     416           1 :                 if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
     417           0 :                         w.err = err
     418           0 :                         return err
     419           0 :                 }
     420             :         }
     421           1 :         w.obsoleteCollector.AddPoint(eval.isObsolete)
     422           1 :         if w.filterBlock != nil {
     423           1 :                 w.filterBlock.addKey(key.UserKey[:eval.kcmp.PrefixLen])
     424           1 :         }
     425           1 :         w.meta.updateSeqNum(key.SeqNum())
     426           1 :         if !w.meta.HasPointKeys {
     427           1 :                 w.meta.SetSmallestPointKey(key.Clone())
     428           1 :         }
     429             : 
     430           1 :         w.props.NumEntries++
     431           1 :         switch key.Kind() {
     432           1 :         case InternalKeyKindDelete, InternalKeyKindSingleDelete:
     433           1 :                 w.props.NumDeletions++
     434           1 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
     435           1 :                 w.dataBlock.numDeletions++
     436           1 :                 w.dataBlock.deletionSize += len(key.UserKey)
     437           1 :         case InternalKeyKindDeleteSized:
     438           1 :                 var size uint64
     439           1 :                 if len(value) > 0 {
     440           1 :                         var n int
     441           1 :                         size, n = binary.Uvarint(value)
     442           1 :                         if n <= 0 {
     443           0 :                                 return errors.Newf("%s key's value (%x) does not parse as uvarint",
     444           0 :                                         errors.Safe(key.Kind().String()), value)
     445           0 :                         }
     446             :                 }
     447           1 :                 w.props.NumDeletions++
     448           1 :                 w.props.NumSizedDeletions++
     449           1 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
     450           1 :                 w.props.RawPointTombstoneValueSize += size
     451           1 :                 w.dataBlock.numDeletions++
     452           1 :                 w.dataBlock.deletionSize += len(key.UserKey)
     453           1 :         case InternalKeyKindMerge:
     454           1 :                 w.props.NumMergeOperands++
     455             :         }
     456           1 :         w.props.RawKeySize += uint64(key.Size())
     457           1 :         w.props.RawValueSize += uint64(len(value))
     458           1 :         return nil
     459             : }
     460             : 
     461             : type pointKeyEvaluation struct {
     462             :         kcmp              colblk.KeyComparison
     463             :         isObsolete        bool
     464             :         writeToValueBlock bool
     465             : }
     466             : 
     467             : // evaluatePoint takes information about a point key being written to the
     468             : // sstable and decides how the point should be represented, where its value
     469             : // should be stored, etc.
     470             : func (w *RawColumnWriter) evaluatePoint(
     471             :         key base.InternalKey, valueLen int,
     472           1 : ) (eval pointKeyEvaluation, err error) {
     473           1 :         eval.kcmp = w.dataBlock.KeyWriter.ComparePrev(key.UserKey)
     474           1 : 
     475           1 :         // When invariants are enabled, validate kcmp.
     476           1 :         if invariants.Enabled {
     477           1 :                 colblk.AssertKeyCompare(w.comparer, key.UserKey, w.previousUserKey.Get(), eval.kcmp)
     478           1 :                 w.previousUserKey.Store(append(w.previousUserKey.Get()[:0], key.UserKey...))
     479           1 :         }
     480             : 
     481           1 :         if !w.meta.HasPointKeys {
     482           1 :                 return eval, nil
     483           1 :         }
     484           1 :         keyKind := key.Kind()
     485           1 :         // Ensure that no one adds a point key kind without considering the obsolete
     486           1 :         // handling for that kind.
     487           1 :         switch keyKind {
     488             :         case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
     489           1 :                 InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     490           0 :         default:
     491           0 :                 panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
     492             :         }
     493           1 :         prevKeyKind := w.prevPointKey.trailer.Kind()
     494           1 :         // If same user key, then the current key is obsolete if any of the
     495           1 :         // following is true:
     496           1 :         // C1 The prev key was obsolete.
     497           1 :         // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
     498           1 :         //    preserve SET* and MERGE since their values will be merged into the
     499           1 :         //    previous key. We also must preserve DEL* since there may be an older
     500           1 :         //    SET*/MERGE in a lower level that must not be merged with the MERGE --
     501           1 :         //    if we omit the DEL* that lower SET*/MERGE will become visible.
     502           1 :         //
     503           1 :         // Regardless of whether it is the same user key or not
     504           1 :         // C3 The current key is some kind of point delete, and we are writing to
     505           1 :         //    the lowest level, then it is also obsolete. The correctness of this
     506           1 :         //    relies on the same user key not spanning multiple sstables in a level.
     507           1 :         //
     508           1 :         // C1 ensures that for a user key there is at most one transition from
     509           1 :         // !obsolete to obsolete. Consider a user key k, for which the first n keys
     510           1 :         // are not obsolete. We consider the various value of n:
     511           1 :         //
     512           1 :         // n = 0: This happens due to forceObsolete being set by the caller, or due
     513           1 :         // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
     514           1 :         // must also delete all the lower seqnums for the same user key. C3 triggers
     515           1 :         // due to a point delete and that deletes all the lower seqnums for the same
     516           1 :         // user key.
     517           1 :         //
     518           1 :         // n = 1: This is the common case. It happens when the first key is not a
     519           1 :         // MERGE, or the current key is some kind of point delete.
     520           1 :         //
     521           1 :         // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
     522           1 :         // single non-MERGE key.
     523           1 :         isObsoleteC1AndC2 := eval.kcmp.UserKeyComparison == 0 &&
     524           1 :                 (w.prevPointKey.isObsolete || prevKeyKind != InternalKeyKindMerge)
     525           1 :         isObsoleteC3 := w.opts.WritingToLowestLevel &&
     526           1 :                 (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
     527           1 :                         keyKind == InternalKeyKindDeleteSized)
     528           1 :         eval.isObsolete = isObsoleteC1AndC2 || isObsoleteC3
     529           1 :         // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
     530           1 :         // possible, but requires some care in documenting and checking invariants.
     531           1 :         // There is code that assumes nothing in value blocks because of single MVCC
     532           1 :         // version (those should be ok). We have to ensure setHasSamePrefix is
     533           1 :         // correctly initialized here etc.
     534           1 : 
     535           1 :         if !w.disableKeyOrderChecks && (eval.kcmp.UserKeyComparison < 0 ||
     536           1 :                 (eval.kcmp.UserKeyComparison == 0 && w.prevPointKey.trailer <= key.Trailer)) {
     537           1 :                 return eval, errors.Errorf(
     538           1 :                         "pebble: keys must be added in strictly increasing order: %s",
     539           1 :                         key.Pretty(w.comparer.FormatKey))
     540           1 :         }
     541             : 
     542             :         // We might want to write this key's value to a value block if it has the
     543             :         // same prefix.
     544             :         //
     545             :         // We require:
     546             :         //  . Value blocks to be enabled.
     547             :         //  . The current key to have the same prefix as the previous key.
     548             :         //  . The previous key to be a SET.
     549             :         //  . The current key to be a SET.
     550             :         //  . If there are bounds requiring some keys' values to be in-place, the
     551             :         //    key must not fall within those bounds.
     552             :         //  . The value to be sufficiently large. (Currently we simply require a
     553             :         //    non-zero length, so all non-empty values are eligible for storage
     554             :         //    out-of-band in a value block.)
     555             :         //
     556             :         // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
     557             :         // valueHandle, this should be > 3. But tiny values are common in test and
     558             :         // unlikely in production, so we use 0 here for better test coverage.
     559           1 :         const tinyValueThreshold = 0
     560           1 :         useValueBlock := !w.opts.DisableValueBlocks &&
     561           1 :                 eval.kcmp.PrefixEqual() &&
     562           1 :                 prevKeyKind == InternalKeyKindSet &&
     563           1 :                 keyKind == InternalKeyKindSet &&
     564           1 :                 valueLen > tinyValueThreshold &&
     565           1 :                 w.valueBlock != nil
     566           1 :         if !useValueBlock {
     567           1 :                 return eval, nil
     568           1 :         }
     569             :         // NB: it is possible that eval.kcmp.UserKeyComparison == 0, i.e., these two
     570             :         // SETs have identical user keys (because of an open snapshot). This should
     571             :         // be the rare case.
     572             : 
     573             :         // If there are bounds requiring some keys' values to be in-place, compare
     574             :         // the prefix against the bounds.
     575           1 :         if !w.opts.RequiredInPlaceValueBound.IsEmpty() {
     576           1 :                 if w.comparer.Compare(w.opts.RequiredInPlaceValueBound.Upper, key.UserKey[:eval.kcmp.PrefixLen]) <= 0 {
     577           1 :                         // Common case for CockroachDB. Make it empty since all future keys
     578           1 :                         // will be >= this key.
     579           1 :                         w.opts.RequiredInPlaceValueBound = UserKeyPrefixBound{}
     580           1 :                 } else if w.comparer.Compare(key.UserKey[:eval.kcmp.PrefixLen], w.opts.RequiredInPlaceValueBound.Lower) >= 0 {
     581           1 :                         // Don't write to value block if the key is within the bounds.
     582           1 :                         return eval, nil
     583           1 :                 }
     584             :         }
     585           1 :         eval.writeToValueBlock = true
     586           1 :         return eval, nil
     587             : }
     588             : 
     589             : var compressedBlockPool = sync.Pool{
     590           1 :         New: func() interface{} {
     591           1 :                 return new(compressedBlock)
     592           1 :         },
     593             : }
     594             : 
     595             : type compressedBlock struct {
     596             :         physical block.PhysicalBlock
     597             :         blockBuf blockBuf
     598             : }
     599             : 
     600           1 : func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) {
     601           1 :         serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
     602           1 :         w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
     603           1 :         // Compute the separator that will be written to the index block alongside
     604           1 :         // this data block's end offset. It is the separator between the last key in
     605           1 :         // the finished block and the [nextKey] that was excluded from the block.
     606           1 :         w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
     607           1 :         w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf)
     608           1 :         w.dataBlock.Reset()
     609           1 :         w.pendingDataBlockSize = 0
     610           1 : }
     611             : 
     612             : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
     613             : // blocks if the number of deletions in the data block exceeds a threshold or
     614             : // the deletion size exceeds a threshold. It should be called after the
     615             : // data block has been finished.
     616             : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
     617           1 : func (w *RawColumnWriter) maybeIncrementTombstoneDenseBlocks(uncompressedLen int) {
     618           1 :         minSize := w.opts.DeletionSizeRatioThreshold * float32(uncompressedLen)
     619           1 :         if w.dataBlock.numDeletions > w.opts.NumDeletionsThreshold || float32(w.dataBlock.deletionSize) > minSize {
     620           1 :                 w.props.NumTombstoneDenseBlocks++
     621           1 :         }
     622           1 :         w.dataBlock.numDeletions = 0
     623           1 :         w.dataBlock.deletionSize = 0
     624             : }
     625             : 
     626             : // enqueueDataBlock compresses and checksums the provided data block and sends
     627             : // it to the write queue to be asynchronously written to the underlying storage.
     628             : // It also adds the block's index block separator to the pending index block,
     629             : // possibly triggering the index block to be finished and buffered.
     630             : func (w *RawColumnWriter) enqueueDataBlock(
     631             :         serializedBlock []byte, lastKey base.InternalKey, separator []byte,
     632           1 : ) error {
     633           1 :         // TODO(jackson): Avoid allocating the largest point user key every time we
     634           1 :         // set the largest point key. This is what the rowblk writer does too, but
     635           1 :         // it's unnecessary.
     636           1 :         w.meta.SetLargestPointKey(lastKey.Clone())
     637           1 : 
     638           1 :         if invariants.Enabled {
     639           1 :                 var dec colblk.DataBlockDecoder
     640           1 :                 dec.Init(w.opts.KeySchema, serializedBlock)
     641           1 :                 if err := dec.Validate(w.comparer, w.opts.KeySchema); err != nil {
     642           0 :                         panic(err)
     643             :                 }
     644             :         }
     645             : 
     646             :         // Serialize the data block, compress it and send it to the write queue.
     647           1 :         cb := compressedBlockPool.Get().(*compressedBlock)
     648           1 :         cb.blockBuf.checksummer.Type = w.opts.Checksum
     649           1 :         cb.physical = block.CompressAndChecksum(
     650           1 :                 &cb.blockBuf.compressedBuf,
     651           1 :                 serializedBlock,
     652           1 :                 w.opts.Compression,
     653           1 :                 &cb.blockBuf.checksummer,
     654           1 :         )
     655           1 :         if !cb.physical.IsCompressed() {
     656           1 :                 // If the block isn't compressed, cb.physical's underlying data points
     657           1 :                 // directly into a buffer owned by w.dataBlock. Clone it before passing
     658           1 :                 // it to the write queue to be asynchronously written to disk.
     659           1 :                 // TODO(jackson): Should we try to avoid this clone by tracking the
     660           1 :                 // lifetime of the DataBlockWriters?
     661           1 :                 cb.physical = cb.physical.Clone()
     662           1 :         }
     663           1 :         return w.enqueuePhysicalBlock(cb, separator)
     664             : }
     665             : 
     666           1 : func (w *RawColumnWriter) enqueuePhysicalBlock(cb *compressedBlock, separator []byte) error {
     667           1 :         dataBlockHandle := block.Handle{
     668           1 :                 Offset: w.queuedDataSize,
     669           1 :                 Length: uint64(cb.physical.LengthWithoutTrailer()),
     670           1 :         }
     671           1 :         w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
     672           1 :         w.writeQueue.ch <- cb
     673           1 : 
     674           1 :         var err error
     675           1 :         w.blockPropsEncoder.resetProps()
     676           1 :         for i := range w.blockPropCollectors {
     677           1 :                 scratch := w.blockPropsEncoder.getScratchForProp()
     678           1 :                 if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
     679           0 :                         return err
     680           0 :                 }
     681           1 :                 w.blockPropsEncoder.addProp(shortID(i), scratch)
     682             :         }
     683           1 :         dataBlockProps := w.blockPropsEncoder.unsafeProps()
     684           1 : 
     685           1 :         // Add the separator to the index block. This might trigger a flush of the
     686           1 :         // index block too.
     687           1 :         i := w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
     688           1 :         sizeWithEntry := w.indexBlock.Size()
     689           1 :         if shouldFlushWithoutLatestKV(sizeWithEntry, w.indexBlockSize, i, &w.indexFlush) {
     690           1 :                 // NB: finishIndexBlock will use blockPropsEncoder, so we must clone the
     691           1 :                 // data block's props first.
     692           1 :                 dataBlockProps = slices.Clone(dataBlockProps)
     693           1 : 
     694           1 :                 if err = w.finishIndexBlock(w.indexBlock.Rows() - 1); err != nil {
     695           0 :                         return err
     696           0 :                 }
     697             :                 // finishIndexBlock reset the index block builder, and we can
     698             :                 // add the block handle to this new index block.
     699           1 :                 _ = w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
     700           1 :                 w.indexBlockSize = w.indexBlock.Size()
     701           1 :         } else {
     702           1 :                 w.indexBlockSize = sizeWithEntry
     703           1 :         }
     704             :         // Incorporate the finished data block's property into the index block, now
     705             :         // that we've flushed the index block without the new separator if
     706             :         // necessary.
     707           1 :         for i := range w.blockPropCollectors {
     708           1 :                 w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
     709           1 :         }
     710           1 :         return nil
     711             : }
     712             : 
     713             : // finishIndexBlock finishes the currently pending index block with the first
     714             : // [rows] rows. In practice, [rows] is always w.indexBlock.Rows() or
     715             : // w.indexBlock.Rows()-1.
     716             : //
     717             : // The finished index block is buffered until the writer is closed.
     718           1 : func (w *RawColumnWriter) finishIndexBlock(rows int) error {
     719           1 :         defer w.indexBlock.Reset()
     720           1 :         w.blockPropsEncoder.resetProps()
     721           1 :         for i := range w.blockPropCollectors {
     722           1 :                 scratch := w.blockPropsEncoder.getScratchForProp()
     723           1 :                 var err error
     724           1 :                 if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
     725           0 :                         return err
     726           0 :                 }
     727           1 :                 w.blockPropsEncoder.addProp(shortID(i), scratch)
     728             :         }
     729           1 :         indexProps := w.blockPropsEncoder.props()
     730           1 :         bib := bufferedIndexBlock{nEntries: rows, properties: indexProps}
     731           1 : 
     732           1 :         // Copy the last (greatest) separator key in the index block into bib.sep.
     733           1 :         // It'll be the separator on the entry in the top-level index block.
     734           1 :         //
     735           1 :         // TODO(jackson): bib.sep.Trailer is unused within the columnar-block
     736           1 :         // sstable writer. Its existence is a code artifact of reuse of the
     737           1 :         // bufferedIndexBlock type between colblk and rowblk writers. This can be
     738           1 :         // cleaned up.
     739           1 :         bib.sep.Trailer = base.MakeTrailer(base.SeqNumMax, base.InternalKeyKindSeparator)
     740           1 :         w.indexBuffering.sepAlloc, bib.sep.UserKey = w.indexBuffering.sepAlloc.Copy(
     741           1 :                 w.indexBlock.UnsafeSeparator(rows - 1))
     742           1 : 
     743           1 :         // Finish the index block and copy it so that w.indexBlock may be reused.
     744           1 :         block := w.indexBlock.Finish(rows)
     745           1 :         if len(w.indexBuffering.blockAlloc) < len(block) {
     746           1 :                 // Allocate enough bytes for approximately 16 index blocks.
     747           1 :                 w.indexBuffering.blockAlloc = make([]byte, len(block)*16)
     748           1 :         }
     749           1 :         n := copy(w.indexBuffering.blockAlloc, block)
     750           1 :         bib.block = w.indexBuffering.blockAlloc[:n:n]
     751           1 :         w.indexBuffering.blockAlloc = w.indexBuffering.blockAlloc[n:]
     752           1 : 
     753           1 :         w.indexBuffering.partitions = append(w.indexBuffering.partitions, bib)
     754           1 :         return nil
     755             : }
     756             : 
     757             : // flushBufferedIndexBlocks writes all index blocks, including the top-level
     758             : // index block if necessary, to the underlying writable. It returns the block
     759             : // handle of the top index (either the only index block or the top-level index
     760             : // if two-level).
     761           1 : func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, err error) {
     762           1 :         // If there's a currently-pending index block, finish it.
     763           1 :         if w.indexBlock.Rows() > 0 || len(w.indexBuffering.partitions) == 0 {
     764           1 :                 w.finishIndexBlock(w.indexBlock.Rows())
     765           1 :         }
     766             :         // We've buffered all the index blocks. Typically there's just one index
     767             :         // block, in which case we're writing a "single-level" index. If we're
     768             :         // writing a large file or the index separators happen to be excessively
     769             :         // long, we may have several index blocks and need to construct a
     770             :         // "two-level" index structure.
     771           1 :         switch len(w.indexBuffering.partitions) {
     772           0 :         case 0:
     773           0 :                 // This is impossible because we'll flush the index block immediately
     774           0 :                 // above this switch statement if there are no buffered partitions
     775           0 :                 // (regardless of whether there are data block handles in the index
     776           0 :                 // block).
     777           0 :                 panic("unreachable")
     778           1 :         case 1:
     779           1 :                 // Single-level index.
     780           1 :                 rootIndex, err = w.layout.WriteIndexBlock(w.indexBuffering.partitions[0].block)
     781           1 :                 if err != nil {
     782           0 :                         return rootIndex, err
     783           0 :                 }
     784           1 :                 w.props.IndexSize = rootIndex.Length + block.TrailerLen
     785           1 :                 w.props.NumDataBlocks = uint64(w.indexBuffering.partitions[0].nEntries)
     786           1 :                 w.props.IndexType = binarySearchIndex
     787           1 :         default:
     788           1 :                 // Two-level index.
     789           1 :                 for _, part := range w.indexBuffering.partitions {
     790           1 :                         bh, err := w.layout.WriteIndexBlock(part.block)
     791           1 :                         if err != nil {
     792           0 :                                 return block.Handle{}, err
     793           0 :                         }
     794           1 :                         w.props.IndexSize += bh.Length + block.TrailerLen
     795           1 :                         w.props.NumDataBlocks += uint64(w.indexBuffering.partitions[0].nEntries)
     796           1 :                         w.topLevelIndexBlock.AddBlockHandle(part.sep.UserKey, bh, part.properties)
     797             :                 }
     798           1 :                 rootIndex, err = w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish(w.topLevelIndexBlock.Rows()))
     799           1 :                 if err != nil {
     800           0 :                         return block.Handle{}, err
     801           0 :                 }
     802           1 :                 w.props.IndexSize += rootIndex.Length + block.TrailerLen
     803           1 :                 w.props.IndexType = twoLevelIndex
     804           1 :                 w.props.IndexPartitions = uint64(len(w.indexBuffering.partitions))
     805             :         }
     806           1 :         return rootIndex, nil
     807             : }
     808             : 
     809             : // drainWriteQueue runs in its own goroutine and is responsible for writing
     810             : // finished, compressed data blocks to the writable. It reads from w.writeQueue
     811             : // until the channel is closed. All data blocks are written by this goroutine.
     812             : // Other blocks are written directly by the client goroutine. See Close.
     813           1 : func (w *RawColumnWriter) drainWriteQueue() {
     814           1 :         defer w.writeQueue.wg.Done()
     815           1 :         for cb := range w.writeQueue.ch {
     816           1 :                 if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
     817           0 :                         w.writeQueue.err = err
     818           0 :                 }
     819           1 :                 cb.blockBuf.clear()
     820           1 :                 cb.physical = block.PhysicalBlock{}
     821           1 :                 compressedBlockPool.Put(cb)
     822             :         }
     823             : }
     824             : 
     825           1 : func (w *RawColumnWriter) Close() (err error) {
     826           1 :         defer func() {
     827           1 :                 if w.valueBlock != nil {
     828           1 :                         releaseValueBlockWriter(w.valueBlock)
     829           1 :                         // Defensive code in case Close gets called again. We don't want to put
     830           1 :                         // the same object to a sync.Pool.
     831           1 :                         w.valueBlock = nil
     832           1 :                 }
     833           1 :                 w.layout.Abort()
     834           1 :                 // Record any error in the writer (so we can exit early if Close is called
     835           1 :                 // again).
     836           1 :                 if err != nil {
     837           1 :                         w.err = err
     838           1 :                 }
     839             :         }()
     840           1 :         if w.layout.writable == nil {
     841           1 :                 return w.err
     842           1 :         }
     843             : 
     844             :         // Finish the last data block and send it to the write queue if it contains
     845             :         // any pending KVs.
     846           1 :         if rows := w.dataBlock.Rows(); rows > 0 {
     847           1 :                 serializedBlock, lastKey := w.dataBlock.Finish(rows, w.pendingDataBlockSize)
     848           1 :                 w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], lastKey.UserKey)
     849           1 :                 w.err = errors.CombineErrors(w.err, w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf))
     850           1 :                 w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
     851           1 :         }
     852             :         // Close the write queue channel so that the goroutine responsible for
     853             :         // writing data blocks to disk knows to exit. Any subsequent blocks (eg,
     854             :         // index, metadata, range key, etc) will be written by the goroutine that
     855             :         // called Close.
     856           1 :         close(w.writeQueue.ch)
     857           1 :         w.writeQueue.wg.Wait()
     858           1 :         // If the write queue encountered any errors while writing out data blocks,
     859           1 :         // it's stored in w.writeQueue.err.
     860           1 :         w.err = firstError(w.err, w.writeQueue.err)
     861           1 :         if w.err != nil {
     862           1 :                 return w.err
     863           1 :         }
     864             : 
     865             :         // INVARIANT: w.queuedDataSize == w.layout.offset.
     866             :         // All data blocks have been written to disk. The queuedDataSize is the
     867             :         // cumulative size of all the data blocks we've sent to the write queue. Now
     868             :         // that they've all been flushed, queuedDataSize should match w.layout's
     869             :         // offset.
     870           1 :         if w.queuedDataSize != w.layout.offset {
     871           0 :                 panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
     872           0 :                         w.queuedDataSize, w.layout.offset))
     873             :         }
     874           1 :         w.props.DataSize = w.layout.offset
     875           1 :         if _, err = w.flushBufferedIndexBlocks(); err != nil {
     876           0 :                 return err
     877           0 :         }
     878             : 
     879             :         // Write the filter block.
     880           1 :         if w.filterBlock != nil {
     881           1 :                 bh, err := w.layout.WriteFilterBlock(w.filterBlock)
     882           1 :                 if err != nil {
     883           0 :                         return err
     884           0 :                 }
     885           1 :                 w.props.FilterPolicyName = w.filterBlock.policyName()
     886           1 :                 w.props.FilterSize = bh.Length
     887             :         }
     888             : 
     889             :         // Write the range deletion block if non-empty.
     890           1 :         if w.rangeDelBlock.KeyCount() > 0 {
     891           1 :                 w.props.NumRangeDeletions = uint64(w.rangeDelBlock.KeyCount())
     892           1 :                 sm, la := w.rangeDelBlock.UnsafeBoundaryKeys()
     893           1 :                 w.meta.SetSmallestRangeDelKey(sm)
     894           1 :                 w.meta.SetLargestRangeDelKey(la)
     895           1 :                 if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
     896           0 :                         return err
     897           0 :                 }
     898             :         }
     899             : 
     900             :         // Write the range key block if non-empty.
     901           1 :         if w.rangeKeyBlock.KeyCount() > 0 {
     902           1 :                 sm, la := w.rangeKeyBlock.UnsafeBoundaryKeys()
     903           1 :                 w.meta.SetSmallestRangeKey(sm)
     904           1 :                 w.meta.SetLargestRangeKey(la)
     905           1 :                 if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
     906           0 :                         return err
     907           0 :                 }
     908             :         }
     909             : 
     910             :         // Write out the value block.
     911           1 :         if w.valueBlock != nil {
     912           1 :                 _, vbStats, err := w.valueBlock.finish(&w.layout, w.layout.offset)
     913           1 :                 if err != nil {
     914           0 :                         return err
     915           0 :                 }
     916           1 :                 w.props.NumValueBlocks = vbStats.numValueBlocks
     917           1 :                 w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
     918           1 :                 w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
     919             :         }
     920             : 
     921             :         // Write the properties block.
     922           1 :         {
     923           1 :                 // Finish and record the prop collectors if props are not yet recorded.
     924           1 :                 // Pre-computed props might have been copied by specialized sst creators
     925           1 :                 // like suffix replacer.
     926           1 :                 if len(w.props.UserProperties) == 0 {
     927           1 :                         userProps := make(map[string]string)
     928           1 :                         for i := range w.blockPropCollectors {
     929           1 :                                 scratch := w.blockPropsEncoder.getScratchForProp()
     930           1 :                                 // Place the shortID in the first byte.
     931           1 :                                 scratch = append(scratch, byte(i))
     932           1 :                                 buf, err := w.blockPropCollectors[i].FinishTable(scratch)
     933           1 :                                 if err != nil {
     934           0 :                                         return err
     935           0 :                                 }
     936           1 :                                 var prop string
     937           1 :                                 if len(buf) > 0 {
     938           1 :                                         prop = string(buf)
     939           1 :                                 }
     940             :                                 // NB: The property is populated in the map even if it is the
     941             :                                 // empty string, since the presence in the map is what indicates
     942             :                                 // that the block property collector was used when writing.
     943           1 :                                 userProps[w.blockPropCollectors[i].Name()] = prop
     944             :                         }
     945           1 :                         if len(userProps) > 0 {
     946           1 :                                 w.props.UserProperties = userProps
     947           1 :                         }
     948             :                 }
     949             : 
     950           1 :                 var raw rowblk.Writer
     951           1 :                 // The restart interval is set to infinity because the properties block
     952           1 :                 // is always read sequentially and cached in a heap located object. This
     953           1 :                 // reduces table size without a significant impact on performance.
     954           1 :                 raw.RestartInterval = propertiesBlockRestartInterval
     955           1 :                 w.props.CompressionOptions = rocksDBCompressionOptions
     956           1 :                 w.props.save(w.opts.TableFormat, &raw)
     957           1 :                 if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
     958           0 :                         return err
     959           0 :                 }
     960             :         }
     961             : 
     962             :         // Write the table footer.
     963           1 :         w.meta.Size, err = w.layout.Finish()
     964           1 :         if err != nil {
     965           1 :                 return err
     966           1 :         }
     967           1 :         w.meta.Properties = w.props
     968           1 :         // Release any held memory and make any future calls error.
     969           1 :         // TODO(jackson): Ensure other calls error appropriately if the writer is
     970           1 :         // cleared.
     971           1 :         *w = RawColumnWriter{meta: w.meta}
     972           1 :         return nil
     973             : }
     974             : 
     975             : // rewriteSuffixes implements RawWriter.
     976             : func (w *RawColumnWriter) rewriteSuffixes(
     977             :         r *Reader, wo WriterOptions, from, to []byte, concurrency int,
     978           1 : ) error {
     979           1 :         for _, c := range w.blockPropCollectors {
     980           1 :                 if !c.SupportsSuffixReplacement() {
     981           0 :                         return errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
     982           0 :                 }
     983             :         }
     984           1 :         l, err := r.Layout()
     985           1 :         if err != nil {
     986           0 :                 return errors.Wrap(err, "reading layout")
     987           0 :         }
     988             :         // Copy data blocks in parallel, rewriting suffixes as we go.
     989           1 :         blocks, err := rewriteDataBlocksInParallel(r, wo, l.Data, from, to, concurrency, func() blockRewriter {
     990           1 :                 return colblk.NewDataBlockRewriter(wo.KeySchema, w.comparer.Compare, w.comparer.Split)
     991           1 :         })
     992           1 :         if err != nil {
     993           1 :                 return errors.Wrap(err, "rewriting data blocks")
     994           1 :         }
     995             : 
     996             :         // oldShortIDs maps the shortID for the block property collector in the old
     997             :         // blocks to the shortID in the new blocks. Initialized once for the sstable.
     998           1 :         oldShortIDs, n, err := getShortIDs(r, w.blockPropCollectors)
     999           1 :         if err != nil {
    1000           0 :                 return errors.Wrap(err, "getting short IDs")
    1001           0 :         }
    1002           1 :         oldProps := make([][]byte, len(w.blockPropCollectors))
    1003           1 :         for i := range blocks {
    1004           1 :                 cb := compressedBlockPool.Get().(*compressedBlock)
    1005           1 :                 cb.physical = blocks[i].physical
    1006           1 : 
    1007           1 :                 // Load any previous values for our prop collectors into oldProps.
    1008           1 :                 for i := range oldProps {
    1009           1 :                         oldProps[i] = nil
    1010           1 :                 }
    1011           1 :                 decoder := makeBlockPropertiesDecoder(n, l.Data[i].Props)
    1012           1 :                 for !decoder.Done() {
    1013           1 :                         id, val, err := decoder.Next()
    1014           1 :                         if err != nil {
    1015           0 :                                 return err
    1016           0 :                         }
    1017           1 :                         if oldShortIDs[id].IsValid() {
    1018           1 :                                 oldProps[oldShortIDs[id]] = val
    1019           1 :                         }
    1020             :                 }
    1021           1 :                 for i, p := range w.blockPropCollectors {
    1022           1 :                         if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil {
    1023           0 :                                 return err
    1024           0 :                         }
    1025             :                 }
    1026           1 :                 var separator []byte
    1027           1 :                 if i+1 < len(blocks) {
    1028           1 :                         w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], blocks[i].end.UserKey, blocks[i+1].start.UserKey)
    1029           1 :                         separator = w.separatorBuf
    1030           1 :                 } else {
    1031           1 :                         w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], blocks[i].end.UserKey)
    1032           1 :                         separator = w.separatorBuf
    1033           1 :                 }
    1034           1 :                 w.enqueuePhysicalBlock(cb, separator)
    1035             :         }
    1036             : 
    1037           1 :         if len(blocks) > 0 {
    1038           1 :                 w.meta.updateSeqNum(blocks[0].start.SeqNum())
    1039           1 :                 w.props.NumEntries = r.Properties.NumEntries
    1040           1 :                 w.props.RawKeySize = r.Properties.RawKeySize
    1041           1 :                 w.props.RawValueSize = r.Properties.RawValueSize
    1042           1 :                 w.meta.SetSmallestPointKey(blocks[0].start)
    1043           1 :                 w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
    1044           1 :         }
    1045             : 
    1046             :         // Copy range key block, replacing suffixes if it exists.
    1047           1 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
    1048           0 :                 return errors.Wrap(err, "rewriting range key blocks")
    1049           0 :         }
    1050             :         // Copy over the filter block if it exists.
    1051           1 :         if w.filterBlock != nil {
    1052           1 :                 if filterBlockBH, ok := l.FilterByName(w.filterBlock.metaName()); ok {
    1053           1 :                         filterBlock, _, err := readBlockBuf(r, filterBlockBH, nil)
    1054           1 :                         if err != nil {
    1055           0 :                                 return errors.Wrap(err, "reading filter")
    1056           0 :                         }
    1057           1 :                         w.filterBlock = copyFilterWriter{
    1058           1 :                                 origPolicyName: w.filterBlock.policyName(),
    1059           1 :                                 origMetaName:   w.filterBlock.metaName(),
    1060           1 :                                 data:           filterBlock,
    1061           1 :                         }
    1062             :                 }
    1063             :         }
    1064           1 :         return nil
    1065             : }
    1066             : 
    1067             : func shouldFlushWithoutLatestKV(
    1068             :         sizeWithKV int, sizeWithoutKV int, entryCountWithoutKV int, flushGovernor *block.FlushGovernor,
    1069           1 : ) bool {
    1070           1 :         if entryCountWithoutKV == 0 {
    1071           1 :                 return false
    1072           1 :         }
    1073           1 :         if sizeWithoutKV < flushGovernor.LowWatermark() {
    1074           1 :                 // Fast path when the block is too small to flush.
    1075           1 :                 return false
    1076           1 :         }
    1077           1 :         return flushGovernor.ShouldFlush(sizeWithoutKV, sizeWithKV)
    1078             : }
    1079             : 
    1080             : // copyDataBlocks adds a range of blocks to the table as-is. These blocks could be
    1081             : // compressed. It's specifically used by the sstable copier that can copy parts
    1082             : // of an sstable to a new sstable, using CopySpan().
    1083             : func (w *RawColumnWriter) copyDataBlocks(
    1084             :         ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
    1085           1 : ) error {
    1086           1 :         buf := make([]byte, 0, 256<<10)
    1087           1 :         readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
    1088           1 :                 if firstBlockIdx > lastBlockIdx {
    1089           0 :                         panic("pebble: readAndFlushBlocks called with invalid block range")
    1090             :                 }
    1091             :                 // We need to flush blocks[firstBlockIdx:lastBlockIdx+1] into the write queue.
    1092             :                 // We do this by issuing one big read from the read handle into the buffer, and
    1093             :                 // then enqueueing the writing of those blocks one-by-one.
    1094             :                 //
    1095             :                 // TODO(bilal): Consider refactoring the write queue to support writing multiple
    1096             :                 // blocks in one request.
    1097           1 :                 lastBH := blocks[lastBlockIdx].bh
    1098           1 :                 blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
    1099           1 :                 if blocksToReadLen > uint64(cap(buf)) {
    1100           0 :                         buf = make([]byte, 0, blocksToReadLen)
    1101           0 :                 }
    1102           1 :                 if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
    1103           0 :                         return err
    1104           0 :                 }
    1105           1 :                 for i := firstBlockIdx; i <= lastBlockIdx; i++ {
    1106           1 :                         offsetDiff := blocks[i].bh.Offset - blocks[firstBlockIdx].bh.Offset
    1107           1 :                         blockBuf := buf[offsetDiff : offsetDiff+blocks[i].bh.Length+block.TrailerLen]
    1108           1 :                         cb := compressedBlockPool.Get().(*compressedBlock)
    1109           1 :                         cb.physical = block.NewPhysicalBlock(blockBuf)
    1110           1 :                         if err := w.enqueuePhysicalBlock(cb, blocks[i].sep); err != nil {
    1111           0 :                                 return err
    1112           0 :                         }
    1113             :                 }
    1114           1 :                 return nil
    1115             :         }
    1116             :         // Iterate through blocks until we have enough to fill cap(buf). When we have more than
    1117             :         // one block in blocksToRead and adding the next block would exceed the buffer capacity,
    1118             :         // we read and flush existing blocks in blocksToRead. This allows us to read as many
    1119             :         // blocks in one IO request as possible, while still utilizing the write queue in this
    1120             :         // writer.
    1121           1 :         lastBlockOffset := uint64(0)
    1122           1 :         for i := 0; i < len(blocks); {
    1123           1 :                 if blocks[i].bh.Offset < lastBlockOffset {
    1124           0 :                         panic("pebble: copyDataBlocks called with blocks out of order")
    1125             :                 }
    1126           1 :                 start := i
    1127           1 :                 // Note the i++ in the initializing condition; this means we will always flush at least
    1128           1 :                 // one block.
    1129           1 :                 for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(cap(buf)); i++ {
    1130           1 :                 }
    1131             :                 // i points to one index past the last block we want to read.
    1132           1 :                 if err := readAndFlushBlocks(start, i-1); err != nil {
    1133           0 :                         return err
    1134           0 :                 }
    1135             :         }
    1136           1 :         return nil
    1137             : }
    1138             : 
    1139             : // addDataBlock adds a raw uncompressed data block to the table as-is. It's specifically used
    1140             : // by the sstable copier that can copy parts of an sstable to a new sstable,
    1141             : // using CopySpan().
    1142           1 : func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
    1143           1 :         // Serialize the data block, compress it and send it to the write queue.
    1144           1 :         cb := compressedBlockPool.Get().(*compressedBlock)
    1145           1 :         cb.blockBuf.checksummer.Type = w.opts.Checksum
    1146           1 :         cb.physical = block.CompressAndChecksum(
    1147           1 :                 &cb.blockBuf.compressedBuf,
    1148           1 :                 b,
    1149           1 :                 w.opts.Compression,
    1150           1 :                 &cb.blockBuf.checksummer,
    1151           1 :         )
    1152           1 :         if !cb.physical.IsCompressed() {
    1153           1 :                 // If the block isn't compressed, cb.physical's underlying data points
    1154           1 :                 // directly into a buffer owned by w.dataBlock. Clone it before passing
    1155           1 :                 // it to the write queue to be asynchronously written to disk.
    1156           1 :                 // TODO(jackson): Should we try to avoid this clone by tracking the
    1157           1 :                 // lifetime of the DataBlockWriters?
    1158           1 :                 cb.physical = cb.physical.Clone()
    1159           1 :         }
    1160           1 :         if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
    1161           0 :                 return err
    1162           0 :         }
    1163           1 :         return nil
    1164             : }
    1165             : 
    1166             : // copyFilter copies the specified filter to the table. It's specifically used
    1167             : // by the sstable copier that can copy parts of an sstable to a new sstable,
    1168             : // using CopySpan().
    1169           0 : func (w *RawColumnWriter) copyFilter(filter []byte, filterName string) error {
    1170           0 :         if w.filterBlock != nil && filterName != w.filterBlock.policyName() {
    1171           0 :                 return errors.New("mismatched filters")
    1172           0 :         }
    1173           0 :         w.filterBlock = copyFilterWriter{
    1174           0 :                 origPolicyName: w.filterBlock.policyName(), origMetaName: w.filterBlock.metaName(), data: filter,
    1175           0 :         }
    1176           0 :         return nil
    1177             : }
    1178             : 
    1179             : // copyProperties copies properties from the specified props, and resets others
    1180             : // to prepare for copying data blocks from another sstable, using the copy/addDataBlock(s)
    1181             : // methods above. It's specifically used by the sstable copier that can copy parts of an
    1182             : // sstable to a new sstable, using CopySpan().
    1183           1 : func (w *RawColumnWriter) copyProperties(props Properties) {
    1184           1 :         w.props = props
    1185           1 :         // Remove all user properties to disable block properties, which we do not
    1186           1 :         // calculate for CopySpan.
    1187           1 :         w.props.UserProperties = nil
    1188           1 :         // Reset props that we'll re-derive as we build our own index.
    1189           1 :         w.props.IndexPartitions = 0
    1190           1 :         w.props.TopLevelIndexSize = 0
    1191           1 :         w.props.IndexSize = 0
    1192           1 :         w.props.IndexType = 0
    1193           1 : }

Generated by: LCOV version 1.14