LCOV - code coverage report
Current view: top level - pebble/sstable - colblk_writer.go (source / functions) Hit Total Coverage
Test: 2024-11-16 08:16Z 9ed54bc4 - meta test only.lcov Lines: 603 806 74.8 %
Date: 2024-11-16 08:17:36 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "math"
      13             :         "slices"
      14             :         "sync"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      19             :         "github.com/cockroachdb/pebble/internal/invariants"
      20             :         "github.com/cockroachdb/pebble/internal/keyspan"
      21             :         "github.com/cockroachdb/pebble/objstorage"
      22             :         "github.com/cockroachdb/pebble/sstable/block"
      23             :         "github.com/cockroachdb/pebble/sstable/colblk"
      24             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      25             :         "github.com/cockroachdb/pebble/sstable/valblk"
      26             : )
      27             : 
      28             : // RawColumnWriter is a sstable RawWriter that writes sstables with
      29             : // column-oriented blocks. All table formats TableFormatPebblev5 and later write
      30             : // column-oriented blocks and use RawColumnWriter.
      31             : type RawColumnWriter struct {
      32             :         comparer *base.Comparer
      33             :         meta     WriterMetadata
      34             :         opts     WriterOptions
      35             :         err      error
      36             : 
      37             :         dataFlush           block.FlushGovernor
      38             :         indexFlush          block.FlushGovernor
      39             :         blockPropCollectors []BlockPropertyCollector
      40             :         blockPropsEncoder   blockPropertiesEncoder
      41             :         obsoleteCollector   obsoleteKeyBlockPropertyCollector
      42             :         props               Properties
      43             :         // block writers buffering unflushed data.
      44             :         dataBlock struct {
      45             :                 colblk.DataBlockEncoder
      46             :                 // numDeletions stores the count of point tombstones in this data block.
      47             :                 // It's used to determine if this data block is considered
      48             :                 // tombstone-dense for the purposes of compaction.
      49             :                 numDeletions int
      50             :                 // deletionSize stores the raw size of point tombstones in this data
      51             :                 // block. It's used to determine if this data block is considered
      52             :                 // tombstone-dense for the purposes of compaction.
      53             :                 deletionSize int
      54             :         }
      55             :         indexBlock         colblk.IndexBlockWriter
      56             :         topLevelIndexBlock colblk.IndexBlockWriter
      57             :         rangeDelBlock      colblk.KeyspanBlockWriter
      58             :         rangeKeyBlock      colblk.KeyspanBlockWriter
      59             :         valueBlock         *valblk.Writer // nil iff WriterOptions.DisableValueBlocks=true
      60             :         // filter accumulates the filter block. If populated, the filter ingests
      61             :         // either the output of w.split (i.e. a prefix extractor) if w.split is not
      62             :         // nil, or the full keys otherwise.
      63             :         filterBlock  filterWriter
      64             :         prevPointKey struct {
      65             :                 trailer    base.InternalKeyTrailer
      66             :                 isObsolete bool
      67             :         }
      68             :         pendingDataBlockSize int
      69             :         indexBlockSize       int
      70             :         queuedDataSize       uint64
      71             : 
      72             :         // indexBuffering holds finished index blocks as they're completed while
      73             :         // building the sstable. If an index block grows sufficiently large
      74             :         // (IndexBlockSize) while an sstable is still being constructed, the sstable
      75             :         // writer will create a two-level index structure. As index blocks are
      76             :         // completed, they're finished and buffered in-memory until the table is
      77             :         // finished. When the table is finished, the buffered index blocks are
      78             :         // flushed in order after all the data blocks, and the top-level index block
      79             :         // is constructed to point to all the individual index blocks.
      80             :         indexBuffering struct {
      81             :                 // partitions holds all the completed index blocks.
      82             :                 partitions []bufferedIndexBlock
      83             :                 // blockAlloc is used to bulk-allocate byte slices used to store index
      84             :                 // blocks in partitions. These live until the sstable is finished.
      85             :                 blockAlloc []byte
      86             :                 // sepAlloc is used to bulk-allocate index block separator slices stored
      87             :                 // in partitions. These live until the sstable is finished.
      88             :                 sepAlloc bytealloc.A
      89             :         }
      90             : 
      91             :         writeQueue struct {
      92             :                 wg  sync.WaitGroup
      93             :                 ch  chan *compressedBlock
      94             :                 err error
      95             :         }
      96             :         layout layoutWriter
      97             : 
      98             :         separatorBuf          []byte
      99             :         tmp                   [blockHandleLikelyMaxLen]byte
     100             :         previousUserKey       invariants.Value[[]byte]
     101             :         disableKeyOrderChecks bool
     102             : }
     103             : 
     104             : // Assert that *RawColumnWriter implements RawWriter.
     105             : var _ RawWriter = (*RawColumnWriter)(nil)
     106             : 
     107           1 : func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
     108           1 :         if writable == nil {
     109           0 :                 panic("pebble: nil writable")
     110             :         }
     111           1 :         if !o.TableFormat.BlockColumnar() {
     112           0 :                 panic(errors.AssertionFailedf("newColumnarWriter cannot create sstables with %s format", o.TableFormat))
     113             :         }
     114           1 :         o = o.ensureDefaults()
     115           1 :         w := &RawColumnWriter{
     116           1 :                 comparer: o.Comparer,
     117           1 :                 meta: WriterMetadata{
     118           1 :                         SmallestSeqNum: math.MaxUint64,
     119           1 :                 },
     120           1 :                 opts:                  o,
     121           1 :                 layout:                makeLayoutWriter(writable, o),
     122           1 :                 disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
     123           1 :         }
     124           1 :         w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
     125           1 :         w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
     126           1 :         w.dataBlock.Init(o.KeySchema)
     127           1 :         w.indexBlock.Init()
     128           1 :         w.topLevelIndexBlock.Init()
     129           1 :         w.rangeDelBlock.Init(w.comparer.Equal)
     130           1 :         w.rangeKeyBlock.Init(w.comparer.Equal)
     131           1 :         if !o.DisableValueBlocks {
     132           1 :                 w.valueBlock = valblk.NewWriter(
     133           1 :                         block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
     134           1 :                         w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
     135             :         }
     136           1 :         if o.FilterPolicy != nil {
     137           1 :                 switch o.FilterType {
     138           1 :                 case TableFilter:
     139           1 :                         w.filterBlock = newTableFilterWriter(o.FilterPolicy)
     140           0 :                 default:
     141           0 :                         panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
     142             :                 }
     143             :         }
     144             : 
     145           1 :         numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
     146           1 :         if !o.disableObsoleteCollector {
     147           1 :                 numBlockPropertyCollectors++
     148           1 :         }
     149           1 :         if numBlockPropertyCollectors > maxPropertyCollectors {
     150           0 :                 panic(errors.New("pebble: too many block property collectors"))
     151             :         }
     152           1 :         w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
     153           1 :         for _, constructFn := range o.BlockPropertyCollectors {
     154           1 :                 w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
     155           1 :         }
     156           1 :         if !o.disableObsoleteCollector {
     157           1 :                 w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
     158           1 :         }
     159           1 :         var buf bytes.Buffer
     160           1 :         buf.WriteString("[")
     161           1 :         for i := range w.blockPropCollectors {
     162           1 :                 if i > 0 {
     163           1 :                         buf.WriteString(",")
     164           1 :                 }
     165           1 :                 buf.WriteString(w.blockPropCollectors[i].Name())
     166             :         }
     167           1 :         buf.WriteString("]")
     168           1 :         w.props.PropertyCollectorNames = buf.String()
     169           1 : 
     170           1 :         w.props.ComparerName = o.Comparer.Name
     171           1 :         w.props.CompressionName = o.Compression.String()
     172           1 :         w.props.KeySchemaName = o.KeySchema.Name
     173           1 :         w.props.MergerName = o.MergerName
     174           1 : 
     175           1 :         w.writeQueue.ch = make(chan *compressedBlock)
     176           1 :         w.writeQueue.wg.Add(1)
     177           1 :         go w.drainWriteQueue()
     178           1 :         return w
     179             : }
     180             : 
     181             : // Error returns the current accumulated error if any.
     182           1 : func (w *RawColumnWriter) Error() error {
     183           1 :         return w.err
     184           1 : }
     185             : 
     186             : // EstimatedSize returns the estimated size of the sstable being written if
     187             : // a call to Close() was made without adding additional keys.
     188           1 : func (w *RawColumnWriter) EstimatedSize() uint64 {
     189           1 :         sz := rocksDBFooterLen + w.queuedDataSize
     190           1 :         // TODO(jackson): Avoid iterating over partitions by incrementally
     191           1 :         // maintaining the size contribution of all buffered partitions.
     192           1 :         for _, bib := range w.indexBuffering.partitions {
     193           1 :                 // We include the separator user key to account for its bytes in the
     194           1 :                 // top-level index block.
     195           1 :                 //
     196           1 :                 // TODO(jackson): We could incrementally build the top-level index block
     197           1 :                 // and produce an exact calculation of the current top-level index
     198           1 :                 // block's size.
     199           1 :                 sz += uint64(len(bib.block) + block.TrailerLen + len(bib.sep.UserKey))
     200           1 :         }
     201           1 :         if w.rangeDelBlock.KeyCount() > 0 {
     202           1 :                 sz += uint64(w.rangeDelBlock.Size())
     203           1 :         }
     204           1 :         if w.rangeKeyBlock.KeyCount() > 0 {
     205           1 :                 sz += uint64(w.rangeKeyBlock.Size())
     206           1 :         }
     207           1 :         if w.valueBlock != nil {
     208           1 :                 sz += w.valueBlock.Size()
     209           1 :         }
     210             :         // TODO(jackson): Include an estimate of the properties, filter and meta
     211             :         // index blocks sizes.
     212           1 :         return sz
     213             : }
     214             : 
     215             : // ComparePrev compares the provided user to the last point key written to the
     216             : // writer. The returned value is equivalent to Compare(key, prevKey) where
     217             : // prevKey is the last point key written to the writer.
     218             : //
     219             : // If no key has been written yet, ComparePrev returns +1.
     220             : //
     221             : // Must not be called after Writer is closed.
     222           1 : func (w *RawColumnWriter) ComparePrev(k []byte) int {
     223           1 :         if w == nil || w.dataBlock.Rows() == 0 {
     224           1 :                 return +1
     225           1 :         }
     226           1 :         return int(w.dataBlock.KeyWriter.ComparePrev(k).UserKeyComparison)
     227             : }
     228             : 
     229             : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
     230             : // be used internally by Pebble.
     231             : func (w *RawColumnWriter) SetSnapshotPinnedProperties(
     232             :         pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
     233           1 : ) {
     234           1 :         w.props.SnapshotPinnedKeys = pinnedKeyCount
     235           1 :         w.props.SnapshotPinnedKeySize = pinnedKeySize
     236           1 :         w.props.SnapshotPinnedValueSize = pinnedValueSize
     237           1 : }
     238             : 
     239             : // Metadata returns the metadata for the finished sstable. Only valid to call
     240             : // after the sstable has been finished.
     241           1 : func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
     242           1 :         if !w.layout.IsFinished() {
     243           0 :                 return nil, errors.New("pebble: writer is not closed")
     244           0 :         }
     245           1 :         return &w.meta, nil
     246             : }
     247             : 
     248             : // EncodeSpan encodes the keys in the given span. The span can contain either
     249             : // only RANGEDEL keys or only range keys.
     250           1 : func (w *RawColumnWriter) EncodeSpan(span keyspan.Span) error {
     251           1 :         if span.Empty() {
     252           1 :                 return nil
     253           1 :         }
     254           1 :         for _, k := range span.Keys {
     255           1 :                 w.meta.updateSeqNum(k.SeqNum())
     256           1 :         }
     257             : 
     258           1 :         blockWriter := &w.rangeKeyBlock
     259           1 :         if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
     260           1 :                 blockWriter = &w.rangeDelBlock
     261           1 :                 // Update range delete properties.
     262           1 :                 // NB: These properties are computed differently than the rowblk sstable
     263           1 :                 // writer because this writer does not flatten them into row key-value
     264           1 :                 // pairs.
     265           1 :                 w.props.RawKeySize += uint64(len(span.Start) + len(span.End))
     266           1 :                 count := uint64(len(span.Keys))
     267           1 :                 w.props.NumEntries += count
     268           1 :                 w.props.NumDeletions += count
     269           1 :                 w.props.NumRangeDeletions += count
     270           1 :         } else {
     271           1 :                 // Update range key properties.
     272           1 :                 // NB: These properties are computed differently than the rowblk sstable
     273           1 :                 // writer because this writer does not flatten them into row key-value
     274           1 :                 // pairs.
     275           1 :                 w.props.RawRangeKeyKeySize += uint64(len(span.Start) + len(span.End))
     276           1 :                 for _, k := range span.Keys {
     277           1 :                         w.props.RawRangeKeyValueSize += uint64(len(k.Value))
     278           1 :                         switch k.Kind() {
     279           1 :                         case base.InternalKeyKindRangeKeyDelete:
     280           1 :                                 w.props.NumRangeKeyDels++
     281           1 :                         case base.InternalKeyKindRangeKeySet:
     282           1 :                                 w.props.NumRangeKeySets++
     283           1 :                         case base.InternalKeyKindRangeKeyUnset:
     284           1 :                                 w.props.NumRangeKeyUnsets++
     285           0 :                         default:
     286           0 :                                 panic(errors.Errorf("pebble: invalid range key type: %s", k.Kind()))
     287             :                         }
     288             :                 }
     289           1 :                 for i := range w.blockPropCollectors {
     290           1 :                         if err := w.blockPropCollectors[i].AddRangeKeys(span); err != nil {
     291           0 :                                 return err
     292           0 :                         }
     293             :                 }
     294             :         }
     295           1 :         if !w.disableKeyOrderChecks && blockWriter.KeyCount() > 0 {
     296           1 :                 // Check that spans are being added in fragmented order. If the two
     297           1 :                 // tombstones overlap, their start and end keys must be identical.
     298           1 :                 prevStart, prevEnd, prevTrailer := blockWriter.UnsafeLastSpan()
     299           1 :                 if w.opts.Comparer.Equal(prevStart, span.Start) && w.opts.Comparer.Equal(prevEnd, span.End) {
     300           0 :                         if prevTrailer < span.Keys[0].Trailer {
     301           0 :                                 w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
     302           0 :                                         w.opts.Comparer.FormatKey(prevStart),
     303           0 :                                         w.opts.Comparer.FormatKey(prevEnd),
     304           0 :                                         prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
     305           0 :                         }
     306           1 :                 } else if c := w.opts.Comparer.Compare(prevEnd, span.Start); c > 0 {
     307           0 :                         w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
     308           0 :                                 w.opts.Comparer.FormatKey(prevStart),
     309           0 :                                 w.opts.Comparer.FormatKey(prevEnd),
     310           0 :                                 prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
     311           0 :                         return w.err
     312           0 :                 }
     313             :         }
     314           1 :         blockWriter.AddSpan(span)
     315           1 :         return nil
     316             : }
     317             : 
     318             : // AddWithForceObsolete adds a point key/value pair when writing a
     319             : // strict-obsolete sstable. For a given Writer, the keys passed to Add must be
     320             : // in increasing order. Span keys (range deletions, range keys) must be added
     321             : // through EncodeSpan.
     322             : //
     323             : // forceObsolete indicates whether the caller has determined that this key is
     324             : // obsolete even though it may be the latest point key for this userkey. This
     325             : // should be set to true for keys obsoleted by RANGEDELs, and is required for
     326             : // strict-obsolete sstables.
     327             : //
     328             : // Note that there are two properties, S1 and S2 (see comment in format.go)
     329             : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
     330             : // responsibility of the caller. S1 is solely the responsibility of the
     331             : // callee.
     332             : func (w *RawColumnWriter) AddWithForceObsolete(
     333             :         key InternalKey, value []byte, forceObsolete bool,
     334           1 : ) error {
     335           1 :         switch key.Kind() {
     336             :         case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
     337           0 :                 base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
     338           0 :                 return errors.Newf("%s must be added through EncodeSpan", key.Kind())
     339           1 :         case base.InternalKeyKindMerge:
     340           1 :                 if w.opts.IsStrictObsolete {
     341           0 :                         return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
     342           0 :                 }
     343             :         }
     344             : 
     345           1 :         eval, err := w.evaluatePoint(key, len(value))
     346           1 :         if err != nil {
     347           0 :                 return err
     348           0 :         }
     349           1 :         eval.isObsolete = eval.isObsolete || forceObsolete
     350           1 :         w.prevPointKey.trailer = key.Trailer
     351           1 :         w.prevPointKey.isObsolete = eval.isObsolete
     352           1 : 
     353           1 :         var valuePrefix block.ValuePrefix
     354           1 :         var valueStoredWithKey []byte
     355           1 :         if eval.writeToValueBlock {
     356           1 :                 vh, err := w.valueBlock.AddValue(value)
     357           1 :                 if err != nil {
     358           0 :                         return err
     359           0 :                 }
     360           1 :                 n := valblk.EncodeHandle(w.tmp[:], vh)
     361           1 :                 valueStoredWithKey = w.tmp[:n]
     362           1 :                 var attribute base.ShortAttribute
     363           1 :                 if w.opts.ShortAttributeExtractor != nil {
     364           0 :                         // TODO(sumeer): for compactions, it is possible that the input sstable
     365           0 :                         // already has this value in the value section and so we have already
     366           0 :                         // extracted the ShortAttribute. Avoid extracting it again. This will
     367           0 :                         // require changing the RawWriter.Add interface.
     368           0 :                         if attribute, err = w.opts.ShortAttributeExtractor(
     369           0 :                                 key.UserKey, int(eval.kcmp.PrefixLen), value); err != nil {
     370           0 :                                 return err
     371           0 :                         }
     372             :                 }
     373           1 :                 valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
     374           1 :         } else {
     375           1 :                 valueStoredWithKey = value
     376           1 :                 if len(value) > 0 {
     377           1 :                         valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
     378           1 :                 }
     379             :         }
     380             : 
     381             :         // Append the key to the data block. We have NOT yet committed to
     382             :         // including the key in the block. The data block writer permits us to
     383             :         // finish the block excluding the last-appended KV.
     384           1 :         entriesWithoutKV := w.dataBlock.Rows()
     385           1 :         w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
     386           1 : 
     387           1 :         // Now that we've appended the KV pair, we can compute the exact size of the
     388           1 :         // block with this key-value pair included. Check to see if we should flush
     389           1 :         // the current block, either with or without the added key-value pair.
     390           1 :         size := w.dataBlock.Size()
     391           1 :         if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize, entriesWithoutKV, &w.dataFlush) {
     392           1 :                 // Flush the data block excluding the key we just added.
     393           1 :                 w.flushDataBlockWithoutNextKey(key.UserKey)
     394           1 :                 // flushDataBlockWithoutNextKey reset the data block builder, and we can
     395           1 :                 // add the key to this next block now.
     396           1 :                 w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
     397           1 :                 w.pendingDataBlockSize = w.dataBlock.Size()
     398           1 :         } else {
     399           1 :                 // We're not flushing the data block, and we're committing to including
     400           1 :                 // the current KV in the block. Remember the new size of the data block
     401           1 :                 // with the current KV.
     402           1 :                 w.pendingDataBlockSize = size
     403           1 :         }
     404             : 
     405           1 :         for i := range w.blockPropCollectors {
     406           1 :                 v := value
     407           1 :                 if key.Kind() == base.InternalKeyKindSet {
     408           1 :                         // Values for SET are not required to be in-place, and in the future
     409           1 :                         // may not even be read by the compaction, so pass nil values. Block
     410           1 :                         // property collectors in such Pebble DB's must not look at the
     411           1 :                         // value.
     412           1 :                         v = nil
     413           1 :                 }
     414           1 :                 if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
     415           0 :                         w.err = err
     416           0 :                         return err
     417           0 :                 }
     418             :         }
     419           1 :         w.obsoleteCollector.AddPoint(eval.isObsolete)
     420           1 :         if w.filterBlock != nil {
     421           1 :                 w.filterBlock.addKey(key.UserKey[:eval.kcmp.PrefixLen])
     422           1 :         }
     423           1 :         w.meta.updateSeqNum(key.SeqNum())
     424           1 :         if !w.meta.HasPointKeys {
     425           1 :                 w.meta.SetSmallestPointKey(key.Clone())
     426           1 :         }
     427             : 
     428           1 :         w.props.NumEntries++
     429           1 :         switch key.Kind() {
     430           1 :         case InternalKeyKindDelete, InternalKeyKindSingleDelete:
     431           1 :                 w.props.NumDeletions++
     432           1 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
     433           1 :                 w.dataBlock.numDeletions++
     434           1 :                 w.dataBlock.deletionSize += len(key.UserKey)
     435           1 :         case InternalKeyKindDeleteSized:
     436           1 :                 var size uint64
     437           1 :                 if len(value) > 0 {
     438           1 :                         var n int
     439           1 :                         size, n = binary.Uvarint(value)
     440           1 :                         if n <= 0 {
     441           0 :                                 return errors.Newf("%s key's value (%x) does not parse as uvarint",
     442           0 :                                         errors.Safe(key.Kind().String()), value)
     443           0 :                         }
     444             :                 }
     445           1 :                 w.props.NumDeletions++
     446           1 :                 w.props.NumSizedDeletions++
     447           1 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
     448           1 :                 w.props.RawPointTombstoneValueSize += size
     449           1 :                 w.dataBlock.numDeletions++
     450           1 :                 w.dataBlock.deletionSize += len(key.UserKey)
     451           1 :         case InternalKeyKindMerge:
     452           1 :                 w.props.NumMergeOperands++
     453             :         }
     454           1 :         w.props.RawKeySize += uint64(key.Size())
     455           1 :         w.props.RawValueSize += uint64(len(value))
     456           1 :         return nil
     457             : }
     458             : 
     459             : type pointKeyEvaluation struct {
     460             :         kcmp              colblk.KeyComparison
     461             :         isObsolete        bool
     462             :         writeToValueBlock bool
     463             : }
     464             : 
     465             : // evaluatePoint takes information about a point key being written to the
     466             : // sstable and decides how the point should be represented, where its value
     467             : // should be stored, etc.
     468             : func (w *RawColumnWriter) evaluatePoint(
     469             :         key base.InternalKey, valueLen int,
     470           1 : ) (eval pointKeyEvaluation, err error) {
     471           1 :         eval.kcmp = w.dataBlock.KeyWriter.ComparePrev(key.UserKey)
     472           1 : 
     473           1 :         // When invariants are enabled, validate kcmp.
     474           1 :         if invariants.Enabled {
     475           1 :                 colblk.AssertKeyCompare(w.comparer, key.UserKey, w.previousUserKey.Get(), eval.kcmp)
     476           1 :                 w.previousUserKey.Set(append(w.previousUserKey.Get()[:0], key.UserKey...))
     477           1 :         }
     478             : 
     479           1 :         if !w.meta.HasPointKeys {
     480           1 :                 return eval, nil
     481           1 :         }
     482           1 :         keyKind := key.Kind()
     483           1 :         // Ensure that no one adds a point key kind without considering the obsolete
     484           1 :         // handling for that kind.
     485           1 :         switch keyKind {
     486             :         case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
     487           1 :                 InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     488           0 :         default:
     489           0 :                 panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
     490             :         }
     491           1 :         prevKeyKind := w.prevPointKey.trailer.Kind()
     492           1 :         // If same user key, then the current key is obsolete if any of the
     493           1 :         // following is true:
     494           1 :         // C1 The prev key was obsolete.
     495           1 :         // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
     496           1 :         //    preserve SET* and MERGE since their values will be merged into the
     497           1 :         //    previous key. We also must preserve DEL* since there may be an older
     498           1 :         //    SET*/MERGE in a lower level that must not be merged with the MERGE --
     499           1 :         //    if we omit the DEL* that lower SET*/MERGE will become visible.
     500           1 :         //
     501           1 :         // Regardless of whether it is the same user key or not
     502           1 :         // C3 The current key is some kind of point delete, and we are writing to
     503           1 :         //    the lowest level, then it is also obsolete. The correctness of this
     504           1 :         //    relies on the same user key not spanning multiple sstables in a level.
     505           1 :         //
     506           1 :         // C1 ensures that for a user key there is at most one transition from
     507           1 :         // !obsolete to obsolete. Consider a user key k, for which the first n keys
     508           1 :         // are not obsolete. We consider the various value of n:
     509           1 :         //
     510           1 :         // n = 0: This happens due to forceObsolete being set by the caller, or due
     511           1 :         // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
     512           1 :         // must also delete all the lower seqnums for the same user key. C3 triggers
     513           1 :         // due to a point delete and that deletes all the lower seqnums for the same
     514           1 :         // user key.
     515           1 :         //
     516           1 :         // n = 1: This is the common case. It happens when the first key is not a
     517           1 :         // MERGE, or the current key is some kind of point delete.
     518           1 :         //
     519           1 :         // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
     520           1 :         // single non-MERGE key.
     521           1 :         isObsoleteC1AndC2 := eval.kcmp.UserKeyComparison == 0 &&
     522           1 :                 (w.prevPointKey.isObsolete || prevKeyKind != InternalKeyKindMerge)
     523           1 :         isObsoleteC3 := w.opts.WritingToLowestLevel &&
     524           1 :                 (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
     525           1 :                         keyKind == InternalKeyKindDeleteSized)
     526           1 :         eval.isObsolete = isObsoleteC1AndC2 || isObsoleteC3
     527           1 :         // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
     528           1 :         // possible, but requires some care in documenting and checking invariants.
     529           1 :         // There is code that assumes nothing in value blocks because of single MVCC
     530           1 :         // version (those should be ok). We have to ensure setHasSamePrefix is
     531           1 :         // correctly initialized here etc.
     532           1 : 
     533           1 :         if !w.disableKeyOrderChecks && (eval.kcmp.UserKeyComparison < 0 ||
     534           1 :                 (eval.kcmp.UserKeyComparison == 0 && w.prevPointKey.trailer <= key.Trailer)) {
     535           0 :                 previousKey := base.InternalKey{
     536           0 :                         UserKey: w.dataBlock.MaterializeLastUserKey(nil),
     537           0 :                         Trailer: w.prevPointKey.trailer,
     538           0 :                 }
     539           0 :                 return eval, errors.Errorf(
     540           0 :                         "pebble: keys must be added in strictly increasing order: %s, %s",
     541           0 :                         previousKey.Pretty(w.comparer.FormatKey),
     542           0 :                         key.Pretty(w.comparer.FormatKey))
     543           0 :         }
     544             : 
     545             :         // We might want to write this key's value to a value block if it has the
     546             :         // same prefix.
     547             :         //
     548             :         // We require:
     549             :         //  . Value blocks to be enabled.
     550             :         //  . The current key to have the same prefix as the previous key.
     551             :         //  . The previous key to be a SET.
     552             :         //  . The current key to be a SET.
     553             :         //  . If there are bounds requiring some keys' values to be in-place, the
     554             :         //    key must not fall within those bounds.
     555             :         //  . The value to be sufficiently large. (Currently we simply require a
     556             :         //    non-zero length, so all non-empty values are eligible for storage
     557             :         //    out-of-band in a value block.)
     558             :         //
     559             :         // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
     560             :         // valueHandle, this should be > 3. But tiny values are common in test and
     561             :         // unlikely in production, so we use 0 here for better test coverage.
     562           1 :         const tinyValueThreshold = 0
     563           1 :         useValueBlock := !w.opts.DisableValueBlocks &&
     564           1 :                 eval.kcmp.PrefixEqual() &&
     565           1 :                 prevKeyKind == InternalKeyKindSet &&
     566           1 :                 keyKind == InternalKeyKindSet &&
     567           1 :                 valueLen > tinyValueThreshold &&
     568           1 :                 w.valueBlock != nil
     569           1 :         if !useValueBlock {
     570           1 :                 return eval, nil
     571           1 :         }
     572             :         // NB: it is possible that eval.kcmp.UserKeyComparison == 0, i.e., these two
     573             :         // SETs have identical user keys (because of an open snapshot). This should
     574             :         // be the rare case.
     575             : 
     576             :         // If there are bounds requiring some keys' values to be in-place, compare
     577             :         // the prefix against the bounds.
     578           1 :         if !w.opts.RequiredInPlaceValueBound.IsEmpty() {
     579           0 :                 if w.comparer.Compare(w.opts.RequiredInPlaceValueBound.Upper, key.UserKey[:eval.kcmp.PrefixLen]) <= 0 {
     580           0 :                         // Common case for CockroachDB. Make it empty since all future keys
     581           0 :                         // will be >= this key.
     582           0 :                         w.opts.RequiredInPlaceValueBound = UserKeyPrefixBound{}
     583           0 :                 } else if w.comparer.Compare(key.UserKey[:eval.kcmp.PrefixLen], w.opts.RequiredInPlaceValueBound.Lower) >= 0 {
     584           0 :                         // Don't write to value block if the key is within the bounds.
     585           0 :                         return eval, nil
     586           0 :                 }
     587             :         }
     588           1 :         eval.writeToValueBlock = true
     589           1 :         return eval, nil
     590             : }
     591             : 
     592             : var compressedBlockPool = sync.Pool{
     593           1 :         New: func() interface{} {
     594           1 :                 return new(compressedBlock)
     595           1 :         },
     596             : }
     597             : 
     598             : type compressedBlock struct {
     599             :         physical block.PhysicalBlock
     600             :         blockBuf blockBuf
     601             : }
     602             : 
     603           1 : func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) {
     604           1 :         serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
     605           1 :         w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
     606           1 :         // Compute the separator that will be written to the index block alongside
     607           1 :         // this data block's end offset. It is the separator between the last key in
     608           1 :         // the finished block and the [nextKey] that was excluded from the block.
     609           1 :         w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
     610           1 :         w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf)
     611           1 :         w.dataBlock.Reset()
     612           1 :         w.pendingDataBlockSize = 0
     613           1 : }
     614             : 
     615             : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
     616             : // blocks if the number of deletions in the data block exceeds a threshold or
     617             : // the deletion size exceeds a threshold. It should be called after the
     618             : // data block has been finished.
     619             : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
     620           1 : func (w *RawColumnWriter) maybeIncrementTombstoneDenseBlocks(uncompressedLen int) {
     621           1 :         minSize := w.opts.DeletionSizeRatioThreshold * float32(uncompressedLen)
     622           1 :         if w.dataBlock.numDeletions > w.opts.NumDeletionsThreshold || float32(w.dataBlock.deletionSize) > minSize {
     623           0 :                 w.props.NumTombstoneDenseBlocks++
     624           0 :         }
     625           1 :         w.dataBlock.numDeletions = 0
     626           1 :         w.dataBlock.deletionSize = 0
     627             : }
     628             : 
     629             : // enqueueDataBlock compresses and checksums the provided data block and sends
     630             : // it to the write queue to be asynchronously written to the underlying storage.
     631             : // It also adds the block's index block separator to the pending index block,
     632             : // possibly triggering the index block to be finished and buffered.
     633             : func (w *RawColumnWriter) enqueueDataBlock(
     634             :         serializedBlock []byte, lastKey base.InternalKey, separator []byte,
     635           1 : ) error {
     636           1 :         // TODO(jackson): Avoid allocating the largest point user key every time we
     637           1 :         // set the largest point key. This is what the rowblk writer does too, but
     638           1 :         // it's unnecessary.
     639           1 :         w.meta.SetLargestPointKey(lastKey.Clone())
     640           1 : 
     641           1 :         if invariants.Enabled {
     642           1 :                 var dec colblk.DataBlockDecoder
     643           1 :                 dec.Init(w.opts.KeySchema, serializedBlock)
     644           1 :                 if err := dec.Validate(w.comparer, w.opts.KeySchema); err != nil {
     645           0 :                         panic(err)
     646             :                 }
     647             :         }
     648             : 
     649             :         // Serialize the data block, compress it and send it to the write queue.
     650           1 :         cb := compressedBlockPool.Get().(*compressedBlock)
     651           1 :         cb.blockBuf.checksummer.Type = w.opts.Checksum
     652           1 :         cb.physical = block.CompressAndChecksum(
     653           1 :                 &cb.blockBuf.compressedBuf,
     654           1 :                 serializedBlock,
     655           1 :                 w.opts.Compression,
     656           1 :                 &cb.blockBuf.checksummer,
     657           1 :         )
     658           1 :         if !cb.physical.IsCompressed() {
     659           1 :                 // If the block isn't compressed, cb.physical's underlying data points
     660           1 :                 // directly into a buffer owned by w.dataBlock. Clone it before passing
     661           1 :                 // it to the write queue to be asynchronously written to disk.
     662           1 :                 // TODO(jackson): Should we try to avoid this clone by tracking the
     663           1 :                 // lifetime of the DataBlockWriters?
     664           1 :                 cb.physical = cb.physical.Clone()
     665           1 :         }
     666           1 :         return w.enqueuePhysicalBlock(cb, separator)
     667             : }
     668             : 
     669           1 : func (w *RawColumnWriter) enqueuePhysicalBlock(cb *compressedBlock, separator []byte) error {
     670           1 :         dataBlockHandle := block.Handle{
     671           1 :                 Offset: w.queuedDataSize,
     672           1 :                 Length: uint64(cb.physical.LengthWithoutTrailer()),
     673           1 :         }
     674           1 :         w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
     675           1 :         w.writeQueue.ch <- cb
     676           1 : 
     677           1 :         var err error
     678           1 :         w.blockPropsEncoder.resetProps()
     679           1 :         for i := range w.blockPropCollectors {
     680           1 :                 scratch := w.blockPropsEncoder.getScratchForProp()
     681           1 :                 if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
     682           0 :                         return err
     683           0 :                 }
     684           1 :                 w.blockPropsEncoder.addProp(shortID(i), scratch)
     685             :         }
     686           1 :         dataBlockProps := w.blockPropsEncoder.unsafeProps()
     687           1 : 
     688           1 :         // Add the separator to the index block. This might trigger a flush of the
     689           1 :         // index block too.
     690           1 :         i := w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
     691           1 :         sizeWithEntry := w.indexBlock.Size()
     692           1 :         if shouldFlushWithoutLatestKV(sizeWithEntry, w.indexBlockSize, i, &w.indexFlush) {
     693           1 :                 // NB: finishIndexBlock will use blockPropsEncoder, so we must clone the
     694           1 :                 // data block's props first.
     695           1 :                 dataBlockProps = slices.Clone(dataBlockProps)
     696           1 : 
     697           1 :                 if err = w.finishIndexBlock(w.indexBlock.Rows() - 1); err != nil {
     698           0 :                         return err
     699           0 :                 }
     700             :                 // finishIndexBlock reset the index block builder, and we can
     701             :                 // add the block handle to this new index block.
     702           1 :                 _ = w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
     703           1 :                 w.indexBlockSize = w.indexBlock.Size()
     704           1 :         } else {
     705           1 :                 w.indexBlockSize = sizeWithEntry
     706           1 :         }
     707             :         // Incorporate the finished data block's property into the index block, now
     708             :         // that we've flushed the index block without the new separator if
     709             :         // necessary.
     710           1 :         for i := range w.blockPropCollectors {
     711           1 :                 w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
     712           1 :         }
     713           1 :         return nil
     714             : }
     715             : 
     716             : // finishIndexBlock finishes the currently pending index block with the first
     717             : // [rows] rows. In practice, [rows] is always w.indexBlock.Rows() or
     718             : // w.indexBlock.Rows()-1.
     719             : //
     720             : // The finished index block is buffered until the writer is closed.
     721           1 : func (w *RawColumnWriter) finishIndexBlock(rows int) error {
     722           1 :         defer w.indexBlock.Reset()
     723           1 :         w.blockPropsEncoder.resetProps()
     724           1 :         for i := range w.blockPropCollectors {
     725           1 :                 scratch := w.blockPropsEncoder.getScratchForProp()
     726           1 :                 var err error
     727           1 :                 if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
     728           0 :                         return err
     729           0 :                 }
     730           1 :                 w.blockPropsEncoder.addProp(shortID(i), scratch)
     731             :         }
     732           1 :         indexProps := w.blockPropsEncoder.props()
     733           1 :         bib := bufferedIndexBlock{nEntries: rows, properties: indexProps}
     734           1 : 
     735           1 :         // Copy the last (greatest) separator key in the index block into bib.sep.
     736           1 :         // It'll be the separator on the entry in the top-level index block.
     737           1 :         //
     738           1 :         // TODO(jackson): bib.sep.Trailer is unused within the columnar-block
     739           1 :         // sstable writer. Its existence is a code artifact of reuse of the
     740           1 :         // bufferedIndexBlock type between colblk and rowblk writers. This can be
     741           1 :         // cleaned up.
     742           1 :         bib.sep.Trailer = base.MakeTrailer(base.SeqNumMax, base.InternalKeyKindSeparator)
     743           1 :         w.indexBuffering.sepAlloc, bib.sep.UserKey = w.indexBuffering.sepAlloc.Copy(
     744           1 :                 w.indexBlock.UnsafeSeparator(rows - 1))
     745           1 : 
     746           1 :         // Finish the index block and copy it so that w.indexBlock may be reused.
     747           1 :         block := w.indexBlock.Finish(rows)
     748           1 :         if len(w.indexBuffering.blockAlloc) < len(block) {
     749           1 :                 // Allocate enough bytes for approximately 16 index blocks.
     750           1 :                 w.indexBuffering.blockAlloc = make([]byte, len(block)*16)
     751           1 :         }
     752           1 :         n := copy(w.indexBuffering.blockAlloc, block)
     753           1 :         bib.block = w.indexBuffering.blockAlloc[:n:n]
     754           1 :         w.indexBuffering.blockAlloc = w.indexBuffering.blockAlloc[n:]
     755           1 : 
     756           1 :         w.indexBuffering.partitions = append(w.indexBuffering.partitions, bib)
     757           1 :         return nil
     758             : }
     759             : 
     760             : // flushBufferedIndexBlocks writes all index blocks, including the top-level
     761             : // index block if necessary, to the underlying writable. It returns the block
     762             : // handle of the top index (either the only index block or the top-level index
     763             : // if two-level).
     764           1 : func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, err error) {
     765           1 :         // If there's a currently-pending index block, finish it.
     766           1 :         if w.indexBlock.Rows() > 0 || len(w.indexBuffering.partitions) == 0 {
     767           1 :                 w.finishIndexBlock(w.indexBlock.Rows())
     768           1 :         }
     769             :         // We've buffered all the index blocks. Typically there's just one index
     770             :         // block, in which case we're writing a "single-level" index. If we're
     771             :         // writing a large file or the index separators happen to be excessively
     772             :         // long, we may have several index blocks and need to construct a
     773             :         // "two-level" index structure.
     774           1 :         switch len(w.indexBuffering.partitions) {
     775           0 :         case 0:
     776           0 :                 // This is impossible because we'll flush the index block immediately
     777           0 :                 // above this switch statement if there are no buffered partitions
     778           0 :                 // (regardless of whether there are data block handles in the index
     779           0 :                 // block).
     780           0 :                 panic("unreachable")
     781           1 :         case 1:
     782           1 :                 // Single-level index.
     783           1 :                 rootIndex, err = w.layout.WriteIndexBlock(w.indexBuffering.partitions[0].block)
     784           1 :                 if err != nil {
     785           0 :                         return rootIndex, err
     786           0 :                 }
     787           1 :                 w.props.IndexSize = rootIndex.Length + block.TrailerLen
     788           1 :                 w.props.NumDataBlocks = uint64(w.indexBuffering.partitions[0].nEntries)
     789           1 :                 w.props.IndexType = binarySearchIndex
     790           1 :         default:
     791           1 :                 // Two-level index.
     792           1 :                 for _, part := range w.indexBuffering.partitions {
     793           1 :                         bh, err := w.layout.WriteIndexBlock(part.block)
     794           1 :                         if err != nil {
     795           0 :                                 return block.Handle{}, err
     796           0 :                         }
     797           1 :                         w.props.IndexSize += bh.Length + block.TrailerLen
     798           1 :                         w.props.NumDataBlocks += uint64(w.indexBuffering.partitions[0].nEntries)
     799           1 :                         w.topLevelIndexBlock.AddBlockHandle(part.sep.UserKey, bh, part.properties)
     800             :                 }
     801           1 :                 rootIndex, err = w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish(w.topLevelIndexBlock.Rows()))
     802           1 :                 if err != nil {
     803           0 :                         return block.Handle{}, err
     804           0 :                 }
     805           1 :                 w.props.IndexSize += rootIndex.Length + block.TrailerLen
     806           1 :                 w.props.IndexType = twoLevelIndex
     807           1 :                 w.props.IndexPartitions = uint64(len(w.indexBuffering.partitions))
     808             :         }
     809           1 :         return rootIndex, nil
     810             : }
     811             : 
     812             : // drainWriteQueue runs in its own goroutine and is responsible for writing
     813             : // finished, compressed data blocks to the writable. It reads from w.writeQueue
     814             : // until the channel is closed. All data blocks are written by this goroutine.
     815             : // Other blocks are written directly by the client goroutine. See Close.
     816           1 : func (w *RawColumnWriter) drainWriteQueue() {
     817           1 :         defer w.writeQueue.wg.Done()
     818           1 :         for cb := range w.writeQueue.ch {
     819           1 :                 if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
     820           0 :                         w.writeQueue.err = err
     821           0 :                 }
     822           1 :                 cb.blockBuf.clear()
     823           1 :                 cb.physical = block.PhysicalBlock{}
     824           1 :                 compressedBlockPool.Put(cb)
     825             :         }
     826             : }
     827             : 
     828           1 : func (w *RawColumnWriter) Close() (err error) {
     829           1 :         defer func() {
     830           1 :                 if w.valueBlock != nil {
     831           0 :                         w.valueBlock.Release()
     832           0 :                         // Defensive code in case Close gets called again. We don't want to put
     833           0 :                         // the same object to a sync.Pool.
     834           0 :                         w.valueBlock = nil
     835           0 :                 }
     836           1 :                 w.layout.Abort()
     837           1 :                 // Record any error in the writer (so we can exit early if Close is called
     838           1 :                 // again).
     839           1 :                 if err != nil {
     840           0 :                         w.err = err
     841           0 :                 }
     842             :         }()
     843           1 :         if w.layout.writable == nil {
     844           0 :                 return w.err
     845           0 :         }
     846             : 
     847             :         // Finish the last data block and send it to the write queue if it contains
     848             :         // any pending KVs.
     849           1 :         if rows := w.dataBlock.Rows(); rows > 0 {
     850           1 :                 serializedBlock, lastKey := w.dataBlock.Finish(rows, w.pendingDataBlockSize)
     851           1 :                 w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], lastKey.UserKey)
     852           1 :                 w.err = errors.CombineErrors(w.err, w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf))
     853           1 :                 w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
     854           1 :         }
     855             :         // Close the write queue channel so that the goroutine responsible for
     856             :         // writing data blocks to disk knows to exit. Any subsequent blocks (eg,
     857             :         // index, metadata, range key, etc) will be written by the goroutine that
     858             :         // called Close.
     859           1 :         close(w.writeQueue.ch)
     860           1 :         w.writeQueue.wg.Wait()
     861           1 :         // If the write queue encountered any errors while writing out data blocks,
     862           1 :         // it's stored in w.writeQueue.err.
     863           1 :         w.err = firstError(w.err, w.writeQueue.err)
     864           1 :         if w.err != nil {
     865           0 :                 return w.err
     866           0 :         }
     867             : 
     868             :         // INVARIANT: w.queuedDataSize == w.layout.offset.
     869             :         // All data blocks have been written to disk. The queuedDataSize is the
     870             :         // cumulative size of all the data blocks we've sent to the write queue. Now
     871             :         // that they've all been flushed, queuedDataSize should match w.layout's
     872             :         // offset.
     873           1 :         if w.queuedDataSize != w.layout.offset {
     874           0 :                 panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
     875           0 :                         w.queuedDataSize, w.layout.offset))
     876             :         }
     877           1 :         w.props.DataSize = w.layout.offset
     878           1 :         if _, err = w.flushBufferedIndexBlocks(); err != nil {
     879           0 :                 return err
     880           0 :         }
     881             : 
     882             :         // Write the filter block.
     883           1 :         if w.filterBlock != nil {
     884           1 :                 bh, err := w.layout.WriteFilterBlock(w.filterBlock)
     885           1 :                 if err != nil {
     886           0 :                         return err
     887           0 :                 }
     888           1 :                 w.props.FilterPolicyName = w.filterBlock.policyName()
     889           1 :                 w.props.FilterSize = bh.Length
     890             :         }
     891             : 
     892             :         // Write the range deletion block if non-empty.
     893           1 :         if w.rangeDelBlock.KeyCount() > 0 {
     894           1 :                 w.props.NumRangeDeletions = uint64(w.rangeDelBlock.KeyCount())
     895           1 :                 sm, la := w.rangeDelBlock.UnsafeBoundaryKeys()
     896           1 :                 w.meta.SetSmallestRangeDelKey(sm)
     897           1 :                 w.meta.SetLargestRangeDelKey(la)
     898           1 :                 if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
     899           0 :                         return err
     900           0 :                 }
     901             :         }
     902             : 
     903             :         // Write the range key block if non-empty.
     904           1 :         if w.rangeKeyBlock.KeyCount() > 0 {
     905           1 :                 sm, la := w.rangeKeyBlock.UnsafeBoundaryKeys()
     906           1 :                 w.meta.SetSmallestRangeKey(sm)
     907           1 :                 w.meta.SetLargestRangeKey(la)
     908           1 :                 if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
     909           0 :                         return err
     910           0 :                 }
     911             :         }
     912             : 
     913             :         // Write out the value block.
     914           1 :         if w.valueBlock != nil {
     915           1 :                 _, vbStats, err := w.valueBlock.Finish(&w.layout, w.layout.offset)
     916           1 :                 if err != nil {
     917           0 :                         return err
     918           0 :                 }
     919           1 :                 w.props.NumValueBlocks = vbStats.NumValueBlocks
     920           1 :                 w.props.NumValuesInValueBlocks = vbStats.NumValuesInValueBlocks
     921           1 :                 w.props.ValueBlocksSize = vbStats.ValueBlocksAndIndexSize
     922             :         }
     923             : 
     924             :         // Write the properties block.
     925           1 :         {
     926           1 :                 // Finish and record the prop collectors if props are not yet recorded.
     927           1 :                 // Pre-computed props might have been copied by specialized sst creators
     928           1 :                 // like suffix replacer.
     929           1 :                 if len(w.props.UserProperties) == 0 {
     930           1 :                         userProps := make(map[string]string)
     931           1 :                         for i := range w.blockPropCollectors {
     932           1 :                                 scratch := w.blockPropsEncoder.getScratchForProp()
     933           1 :                                 // Place the shortID in the first byte.
     934           1 :                                 scratch = append(scratch, byte(i))
     935           1 :                                 buf, err := w.blockPropCollectors[i].FinishTable(scratch)
     936           1 :                                 if err != nil {
     937           0 :                                         return err
     938           0 :                                 }
     939           1 :                                 var prop string
     940           1 :                                 if len(buf) > 0 {
     941           1 :                                         prop = string(buf)
     942           1 :                                 }
     943             :                                 // NB: The property is populated in the map even if it is the
     944             :                                 // empty string, since the presence in the map is what indicates
     945             :                                 // that the block property collector was used when writing.
     946           1 :                                 userProps[w.blockPropCollectors[i].Name()] = prop
     947             :                         }
     948           1 :                         if len(userProps) > 0 {
     949           1 :                                 w.props.UserProperties = userProps
     950           1 :                         }
     951             :                 }
     952             : 
     953           1 :                 var raw rowblk.Writer
     954           1 :                 // The restart interval is set to infinity because the properties block
     955           1 :                 // is always read sequentially and cached in a heap located object. This
     956           1 :                 // reduces table size without a significant impact on performance.
     957           1 :                 raw.RestartInterval = propertiesBlockRestartInterval
     958           1 :                 w.props.CompressionOptions = rocksDBCompressionOptions
     959           1 :                 w.props.save(w.opts.TableFormat, &raw)
     960           1 :                 if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
     961           0 :                         return err
     962           0 :                 }
     963             :         }
     964             : 
     965             :         // Write the table footer.
     966           1 :         w.meta.Size, err = w.layout.Finish()
     967           1 :         if err != nil {
     968           0 :                 return err
     969           0 :         }
     970           1 :         w.meta.Properties = w.props
     971           1 :         // Release any held memory and make any future calls error.
     972           1 :         // TODO(jackson): Ensure other calls error appropriately if the writer is
     973           1 :         // cleared.
     974           1 :         *w = RawColumnWriter{meta: w.meta}
     975           1 :         return nil
     976             : }
     977             : 
     978             : // rewriteSuffixes implements RawWriter.
     979             : func (w *RawColumnWriter) rewriteSuffixes(
     980             :         r *Reader, wo WriterOptions, from, to []byte, concurrency int,
     981           0 : ) error {
     982           0 :         for _, c := range w.blockPropCollectors {
     983           0 :                 if !c.SupportsSuffixReplacement() {
     984           0 :                         return errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
     985           0 :                 }
     986             :         }
     987           0 :         l, err := r.Layout()
     988           0 :         if err != nil {
     989           0 :                 return errors.Wrap(err, "reading layout")
     990           0 :         }
     991             :         // Copy data blocks in parallel, rewriting suffixes as we go.
     992           0 :         blocks, err := rewriteDataBlocksInParallel(r, wo, l.Data, from, to, concurrency, func() blockRewriter {
     993           0 :                 return colblk.NewDataBlockRewriter(wo.KeySchema, w.comparer.Compare, w.comparer.Split)
     994           0 :         })
     995           0 :         if err != nil {
     996           0 :                 return errors.Wrap(err, "rewriting data blocks")
     997           0 :         }
     998             : 
     999             :         // oldShortIDs maps the shortID for the block property collector in the old
    1000             :         // blocks to the shortID in the new blocks. Initialized once for the sstable.
    1001           0 :         oldShortIDs, n, err := getShortIDs(r, w.blockPropCollectors)
    1002           0 :         if err != nil {
    1003           0 :                 return errors.Wrap(err, "getting short IDs")
    1004           0 :         }
    1005           0 :         oldProps := make([][]byte, len(w.blockPropCollectors))
    1006           0 :         for i := range blocks {
    1007           0 :                 cb := compressedBlockPool.Get().(*compressedBlock)
    1008           0 :                 cb.physical = blocks[i].physical
    1009           0 : 
    1010           0 :                 // Load any previous values for our prop collectors into oldProps.
    1011           0 :                 for i := range oldProps {
    1012           0 :                         oldProps[i] = nil
    1013           0 :                 }
    1014           0 :                 decoder := makeBlockPropertiesDecoder(n, l.Data[i].Props)
    1015           0 :                 for !decoder.Done() {
    1016           0 :                         id, val, err := decoder.Next()
    1017           0 :                         if err != nil {
    1018           0 :                                 return err
    1019           0 :                         }
    1020           0 :                         if oldShortIDs[id].IsValid() {
    1021           0 :                                 oldProps[oldShortIDs[id]] = val
    1022           0 :                         }
    1023             :                 }
    1024           0 :                 for i, p := range w.blockPropCollectors {
    1025           0 :                         if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil {
    1026           0 :                                 return err
    1027           0 :                         }
    1028             :                 }
    1029           0 :                 var separator []byte
    1030           0 :                 if i+1 < len(blocks) {
    1031           0 :                         w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], blocks[i].end.UserKey, blocks[i+1].start.UserKey)
    1032           0 :                         separator = w.separatorBuf
    1033           0 :                 } else {
    1034           0 :                         w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], blocks[i].end.UserKey)
    1035           0 :                         separator = w.separatorBuf
    1036           0 :                 }
    1037           0 :                 w.enqueuePhysicalBlock(cb, separator)
    1038             :         }
    1039             : 
    1040           0 :         if len(blocks) > 0 {
    1041           0 :                 w.meta.updateSeqNum(blocks[0].start.SeqNum())
    1042           0 :                 w.props.NumEntries = r.Properties.NumEntries
    1043           0 :                 w.props.RawKeySize = r.Properties.RawKeySize
    1044           0 :                 w.props.RawValueSize = r.Properties.RawValueSize
    1045           0 :                 w.meta.SetSmallestPointKey(blocks[0].start)
    1046           0 :                 w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
    1047           0 :         }
    1048             : 
    1049             :         // Copy range key block, replacing suffixes if it exists.
    1050           0 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
    1051           0 :                 return errors.Wrap(err, "rewriting range key blocks")
    1052           0 :         }
    1053             :         // Copy over the filter block if it exists.
    1054           0 :         if w.filterBlock != nil {
    1055           0 :                 if filterBlockBH, ok := l.FilterByName(w.filterBlock.metaName()); ok {
    1056           0 :                         filterBlock, _, err := readBlockBuf(r, filterBlockBH, nil)
    1057           0 :                         if err != nil {
    1058           0 :                                 return errors.Wrap(err, "reading filter")
    1059           0 :                         }
    1060           0 :                         w.filterBlock = copyFilterWriter{
    1061           0 :                                 origPolicyName: w.filterBlock.policyName(),
    1062           0 :                                 origMetaName:   w.filterBlock.metaName(),
    1063           0 :                                 data:           filterBlock,
    1064           0 :                         }
    1065             :                 }
    1066             :         }
    1067           0 :         return nil
    1068             : }
    1069             : 
    1070             : func shouldFlushWithoutLatestKV(
    1071             :         sizeWithKV int, sizeWithoutKV int, entryCountWithoutKV int, flushGovernor *block.FlushGovernor,
    1072           1 : ) bool {
    1073           1 :         if entryCountWithoutKV == 0 {
    1074           1 :                 return false
    1075           1 :         }
    1076           1 :         if sizeWithoutKV < flushGovernor.LowWatermark() {
    1077           1 :                 // Fast path when the block is too small to flush.
    1078           1 :                 return false
    1079           1 :         }
    1080           1 :         return flushGovernor.ShouldFlush(sizeWithoutKV, sizeWithKV)
    1081             : }
    1082             : 
    1083             : // copyDataBlocks adds a range of blocks to the table as-is. These blocks could be
    1084             : // compressed. It's specifically used by the sstable copier that can copy parts
    1085             : // of an sstable to a new sstable, using CopySpan().
    1086             : func (w *RawColumnWriter) copyDataBlocks(
    1087             :         ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
    1088           1 : ) error {
    1089           1 :         buf := make([]byte, 0, 256<<10)
    1090           1 :         readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
    1091           1 :                 if firstBlockIdx > lastBlockIdx {
    1092           0 :                         panic("pebble: readAndFlushBlocks called with invalid block range")
    1093             :                 }
    1094             :                 // We need to flush blocks[firstBlockIdx:lastBlockIdx+1] into the write queue.
    1095             :                 // We do this by issuing one big read from the read handle into the buffer, and
    1096             :                 // then enqueueing the writing of those blocks one-by-one.
    1097             :                 //
    1098             :                 // TODO(bilal): Consider refactoring the write queue to support writing multiple
    1099             :                 // blocks in one request.
    1100           1 :                 lastBH := blocks[lastBlockIdx].bh
    1101           1 :                 blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
    1102           1 :                 if blocksToReadLen > uint64(cap(buf)) {
    1103           0 :                         buf = make([]byte, 0, blocksToReadLen)
    1104           0 :                 }
    1105           1 :                 if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
    1106           0 :                         return err
    1107           0 :                 }
    1108           1 :                 for i := firstBlockIdx; i <= lastBlockIdx; i++ {
    1109           1 :                         offsetDiff := blocks[i].bh.Offset - blocks[firstBlockIdx].bh.Offset
    1110           1 :                         blockBuf := buf[offsetDiff : offsetDiff+blocks[i].bh.Length+block.TrailerLen]
    1111           1 :                         cb := compressedBlockPool.Get().(*compressedBlock)
    1112           1 :                         cb.physical = block.NewPhysicalBlock(blockBuf)
    1113           1 :                         if err := w.enqueuePhysicalBlock(cb, blocks[i].sep); err != nil {
    1114           0 :                                 return err
    1115           0 :                         }
    1116             :                 }
    1117           1 :                 return nil
    1118             :         }
    1119             :         // Iterate through blocks until we have enough to fill cap(buf). When we have more than
    1120             :         // one block in blocksToRead and adding the next block would exceed the buffer capacity,
    1121             :         // we read and flush existing blocks in blocksToRead. This allows us to read as many
    1122             :         // blocks in one IO request as possible, while still utilizing the write queue in this
    1123             :         // writer.
    1124           1 :         lastBlockOffset := uint64(0)
    1125           1 :         for i := 0; i < len(blocks); {
    1126           1 :                 if blocks[i].bh.Offset < lastBlockOffset {
    1127           0 :                         panic("pebble: copyDataBlocks called with blocks out of order")
    1128             :                 }
    1129           1 :                 start := i
    1130           1 :                 // Note the i++ in the initializing condition; this means we will always flush at least
    1131           1 :                 // one block.
    1132           1 :                 for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(cap(buf)); i++ {
    1133           1 :                 }
    1134             :                 // i points to one index past the last block we want to read.
    1135           1 :                 if err := readAndFlushBlocks(start, i-1); err != nil {
    1136           0 :                         return err
    1137           0 :                 }
    1138             :         }
    1139           1 :         return nil
    1140             : }
    1141             : 
    1142             : // addDataBlock adds a raw uncompressed data block to the table as-is. It's specifically used
    1143             : // by the sstable copier that can copy parts of an sstable to a new sstable,
    1144             : // using CopySpan().
    1145           1 : func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
    1146           1 :         // Serialize the data block, compress it and send it to the write queue.
    1147           1 :         cb := compressedBlockPool.Get().(*compressedBlock)
    1148           1 :         cb.blockBuf.checksummer.Type = w.opts.Checksum
    1149           1 :         cb.physical = block.CompressAndChecksum(
    1150           1 :                 &cb.blockBuf.compressedBuf,
    1151           1 :                 b,
    1152           1 :                 w.opts.Compression,
    1153           1 :                 &cb.blockBuf.checksummer,
    1154           1 :         )
    1155           1 :         if !cb.physical.IsCompressed() {
    1156           1 :                 // If the block isn't compressed, cb.physical's underlying data points
    1157           1 :                 // directly into a buffer owned by w.dataBlock. Clone it before passing
    1158           1 :                 // it to the write queue to be asynchronously written to disk.
    1159           1 :                 // TODO(jackson): Should we try to avoid this clone by tracking the
    1160           1 :                 // lifetime of the DataBlockWriters?
    1161           1 :                 cb.physical = cb.physical.Clone()
    1162           1 :         }
    1163           1 :         if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
    1164           0 :                 return err
    1165           0 :         }
    1166           1 :         return nil
    1167             : }
    1168             : 
    1169             : // copyFilter copies the specified filter to the table. It's specifically used
    1170             : // by the sstable copier that can copy parts of an sstable to a new sstable,
    1171             : // using CopySpan().
    1172           1 : func (w *RawColumnWriter) copyFilter(filter []byte, filterName string) error {
    1173           1 :         if w.filterBlock != nil && filterName != w.filterBlock.policyName() {
    1174           0 :                 return errors.New("mismatched filters")
    1175           0 :         }
    1176           1 :         w.filterBlock = copyFilterWriter{
    1177           1 :                 origPolicyName: w.filterBlock.policyName(), origMetaName: w.filterBlock.metaName(), data: filter,
    1178           1 :         }
    1179           1 :         return nil
    1180             : }
    1181             : 
    1182             : // copyProperties copies properties from the specified props, and resets others
    1183             : // to prepare for copying data blocks from another sstable, using the copy/addDataBlock(s)
    1184             : // methods above. It's specifically used by the sstable copier that can copy parts of an
    1185             : // sstable to a new sstable, using CopySpan().
    1186           1 : func (w *RawColumnWriter) copyProperties(props Properties) {
    1187           1 :         w.props = props
    1188           1 :         // Remove all user properties to disable block properties, which we do not
    1189           1 :         // calculate for CopySpan.
    1190           1 :         w.props.UserProperties = nil
    1191           1 :         // Reset props that we'll re-derive as we build our own index.
    1192           1 :         w.props.IndexPartitions = 0
    1193           1 :         w.props.TopLevelIndexSize = 0
    1194           1 :         w.props.IndexSize = 0
    1195           1 :         w.props.IndexType = 0
    1196           1 : }

Generated by: LCOV version 1.14