LCOV - code coverage report
Current view: top level - pebble/sstable - writer.go (source / functions) Hit Total Coverage
Test: 2024-07-16 08:16Z b6c49f44 - tests only.lcov Lines: 1161 1302 89.2 %
Date: 2024-07-16 08:17:23 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "encoding/binary"
      10             :         "fmt"
      11             :         "math"
      12             :         "runtime"
      13             :         "slices"
      14             :         "sort"
      15             :         "sync"
      16             : 
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      20             :         "github.com/cockroachdb/pebble/internal/cache"
      21             :         "github.com/cockroachdb/pebble/internal/invariants"
      22             :         "github.com/cockroachdb/pebble/internal/keyspan"
      23             :         "github.com/cockroachdb/pebble/internal/rangedel"
      24             :         "github.com/cockroachdb/pebble/internal/rangekey"
      25             :         "github.com/cockroachdb/pebble/objstorage"
      26             :         "github.com/cockroachdb/pebble/sstable/block"
      27             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      28             : )
      29             : 
      30             : // encodedBHPEstimatedSize estimates the size of the encoded BlockHandleWithProperties.
      31             : // It would also be nice to account for the length of the data block properties here,
      32             : // but isn't necessary since this is an estimate.
      33             : const encodedBHPEstimatedSize = binary.MaxVarintLen64 * 2
      34             : 
      35             : var errWriterClosed = errors.New("pebble: writer is closed")
      36             : 
      37             : // WriterMetadata holds info about a finished sstable.
      38             : type WriterMetadata struct {
      39             :         Size          uint64
      40             :         SmallestPoint InternalKey
      41             :         // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
      42             :         // before Writer.Close is called, because they may only be set on
      43             :         // Writer.Close.
      44             :         LargestPoint     InternalKey
      45             :         SmallestRangeDel InternalKey
      46             :         LargestRangeDel  InternalKey
      47             :         SmallestRangeKey InternalKey
      48             :         LargestRangeKey  InternalKey
      49             :         HasPointKeys     bool
      50             :         HasRangeDelKeys  bool
      51             :         HasRangeKeys     bool
      52             :         SmallestSeqNum   base.SeqNum
      53             :         LargestSeqNum    base.SeqNum
      54             :         Properties       Properties
      55             : }
      56             : 
      57             : // SetSmallestPointKey sets the smallest point key to the given key.
      58             : // NB: this method set the "absolute" smallest point key. Any existing key is
      59             : // overridden.
      60           1 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
      61           1 :         m.SmallestPoint = k
      62           1 :         m.HasPointKeys = true
      63           1 : }
      64             : 
      65             : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
      66             : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
      67             : // overridden.
      68           1 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
      69           1 :         m.SmallestRangeDel = k
      70           1 :         m.HasRangeDelKeys = true
      71           1 : }
      72             : 
      73             : // SetSmallestRangeKey sets the smallest range key to the given key.
      74             : // NB: this method set the "absolute" smallest range key. Any existing key is
      75             : // overridden.
      76           1 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
      77           1 :         m.SmallestRangeKey = k
      78           1 :         m.HasRangeKeys = true
      79           1 : }
      80             : 
      81             : // SetLargestPointKey sets the largest point key to the given key.
      82             : // NB: this method set the "absolute" largest point key. Any existing key is
      83             : // overridden.
      84           1 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
      85           1 :         m.LargestPoint = k
      86           1 :         m.HasPointKeys = true
      87           1 : }
      88             : 
      89             : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
      90             : // NB: this method set the "absolute" largest rangedel key. Any existing key is
      91             : // overridden.
      92           1 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
      93           1 :         m.LargestRangeDel = k
      94           1 :         m.HasRangeDelKeys = true
      95           1 : }
      96             : 
      97             : // SetLargestRangeKey sets the largest range key to the given key.
      98             : // NB: this method set the "absolute" largest range key. Any existing key is
      99             : // overridden.
     100           1 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
     101           1 :         m.LargestRangeKey = k
     102           1 :         m.HasRangeKeys = true
     103           1 : }
     104             : 
     105           1 : func (m *WriterMetadata) updateSeqNum(seqNum base.SeqNum) {
     106           1 :         if m.SmallestSeqNum > seqNum {
     107           1 :                 m.SmallestSeqNum = seqNum
     108           1 :         }
     109           1 :         if m.LargestSeqNum < seqNum {
     110           1 :                 m.LargestSeqNum = seqNum
     111           1 :         }
     112             : }
     113             : 
     114             : // flushDecisionOptions holds parameters to inform the sstable block flushing
     115             : // heuristics.
     116             : type flushDecisionOptions struct {
     117             :         blockSize          int
     118             :         blockSizeThreshold int
     119             :         // sizeClassAwareThreshold takes precedence over blockSizeThreshold when the
     120             :         // Writer is aware of the allocator's size classes.
     121             :         sizeClassAwareThreshold int
     122             : }
     123             : 
     124             : // Writer is a table writer.
     125             : type Writer struct {
     126             :         layout layoutWriter
     127             :         meta   WriterMetadata
     128             :         err    error
     129             :         // dataBlockOptions and indexBlockOptions are used to configure the sstable
     130             :         // block flush heuristics.
     131             :         dataBlockOptions  flushDecisionOptions
     132             :         indexBlockOptions flushDecisionOptions
     133             :         // The following fields are copied from Options.
     134             :         compare              Compare
     135             :         split                Split
     136             :         formatKey            base.FormatKey
     137             :         compression          Compression
     138             :         separator            Separator
     139             :         successor            Successor
     140             :         tableFormat          TableFormat
     141             :         isStrictObsolete     bool
     142             :         writingToLowestLevel bool
     143             :         restartInterval      int
     144             :         checksumType         block.ChecksumType
     145             :         // disableKeyOrderChecks disables the checks that keys are added to an
     146             :         // sstable in order. It is intended for internal use only in the construction
     147             :         // of invalid sstables for testing. See tool/make_test_sstables.go.
     148             :         disableKeyOrderChecks bool
     149             :         // With two level indexes, the index/filter of a SST file is partitioned into
     150             :         // smaller blocks with an additional top-level index on them. When reading an
     151             :         // index/filter, only the top-level index is loaded into memory. The two level
     152             :         // index/filter then uses the top-level index to load on demand into the block
     153             :         // cache the partitions that are required to perform the index/filter query.
     154             :         //
     155             :         // Two level indexes are enabled automatically when there is more than one
     156             :         // index block.
     157             :         //
     158             :         // This is useful when there are very large index blocks, which generally occurs
     159             :         // with the usage of large keys. With large index blocks, the index blocks fight
     160             :         // the data blocks for block cache space and the index blocks are likely to be
     161             :         // re-read many times from the disk. The top level index, which has a much
     162             :         // smaller memory footprint, can be used to prevent the entire index block from
     163             :         // being loaded into the block cache.
     164             :         twoLevelIndex       bool
     165             :         indexBlock          *indexBlockBuf
     166             :         rangeDelBlock       rowblk.Writer
     167             :         rangeKeyBlock       rowblk.Writer
     168             :         topLevelIndexBlock  rowblk.Writer
     169             :         props               Properties
     170             :         blockPropCollectors []BlockPropertyCollector
     171             :         obsoleteCollector   obsoleteKeyBlockPropertyCollector
     172             :         blockPropsEncoder   blockPropertiesEncoder
     173             :         // filter accumulates the filter block. If populated, the filter ingests
     174             :         // either the output of w.split (i.e. a prefix extractor) if w.split is not
     175             :         // nil, or the full keys otherwise.
     176             :         filter          filterWriter
     177             :         indexPartitions []indexBlockAndBlockProperties
     178             : 
     179             :         // indexBlockAlloc is used to bulk-allocate byte slices used to store index
     180             :         // blocks in indexPartitions. These live until the index finishes.
     181             :         indexBlockAlloc []byte
     182             :         // indexSepAlloc is used to bulk-allocate index block separator slices stored
     183             :         // in indexPartitions. These live until the index finishes.
     184             :         indexSepAlloc bytealloc.A
     185             : 
     186             :         // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
     187             :         // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
     188             :         // and values, emitting fragmented, coalesced spans as appropriate. Range
     189             :         // keys must be added in order of their start user-key.
     190             :         fragmenter        keyspan.Fragmenter
     191             :         rangeKeyEncoder   rangekey.Encoder
     192             :         rangeKeysBySuffix keyspan.KeysBySuffix
     193             :         rangeKeySpan      keyspan.Span
     194             :         rkBuf             []byte
     195             :         // dataBlockBuf consists of the state which is currently owned by and used by
     196             :         // the Writer client goroutine. This state can be handed off to other goroutines.
     197             :         dataBlockBuf *dataBlockBuf
     198             :         // blockBuf consists of the state which is owned by and used by the Writer client
     199             :         // goroutine.
     200             :         blockBuf blockBuf
     201             : 
     202             :         coordination coordinationState
     203             : 
     204             :         // Information (other than the byte slice) about the last point key, to
     205             :         // avoid extracting it again.
     206             :         lastPointKeyInfo pointKeyInfo
     207             : 
     208             :         // For value blocks.
     209             :         shortAttributeExtractor   base.ShortAttributeExtractor
     210             :         requiredInPlaceValueBound UserKeyPrefixBound
     211             :         // When w.tableFormat >= TableFormatPebblev3, valueBlockWriter is nil iff
     212             :         // WriterOptions.DisableValueBlocks was true.
     213             :         valueBlockWriter *valueBlockWriter
     214             : 
     215             :         allocatorSizeClasses []int
     216             : }
     217             : 
     218             : type pointKeyInfo struct {
     219             :         trailer base.InternalKeyTrailer
     220             :         // Only computed when w.valueBlockWriter is not nil.
     221             :         userKeyLen int
     222             :         // prefixLen uses w.split, if not nil. Only computed when w.valueBlockWriter
     223             :         // is not nil.
     224             :         prefixLen int
     225             :         // True iff the point was marked obsolete.
     226             :         isObsolete bool
     227             : }
     228             : 
     229             : type coordinationState struct {
     230             :         parallelismEnabled bool
     231             : 
     232             :         // writeQueue is used to write data blocks to disk. The writeQueue is primarily
     233             :         // used to maintain the order in which data blocks must be written to disk. For
     234             :         // this reason, every single data block write must be done through the writeQueue.
     235             :         writeQueue *writeQueue
     236             : 
     237             :         sizeEstimate dataBlockEstimates
     238             : }
     239             : 
     240           1 : func (c *coordinationState) init(parallelismEnabled bool, writer *Writer) {
     241           1 :         c.parallelismEnabled = parallelismEnabled
     242           1 :         // useMutex is false regardless of parallelismEnabled, because we do not do
     243           1 :         // parallel compression yet.
     244           1 :         c.sizeEstimate.useMutex = false
     245           1 : 
     246           1 :         // writeQueueSize determines the size of the write queue, or the number
     247           1 :         // of items which can be added to the queue without blocking. By default, we
     248           1 :         // use a writeQueue size of 0, since we won't be doing any block writes in
     249           1 :         // parallel.
     250           1 :         writeQueueSize := 0
     251           1 :         if parallelismEnabled {
     252           1 :                 writeQueueSize = runtime.GOMAXPROCS(0)
     253           1 :         }
     254           1 :         c.writeQueue = newWriteQueue(writeQueueSize, writer)
     255             : }
     256             : 
     257             : // sizeEstimate is a general purpose helper for estimating two kinds of sizes:
     258             : // A. The compressed sstable size, which is useful for deciding when to start
     259             : //
     260             : //      a new sstable during flushes or compactions. In practice, we use this in
     261             : //      estimating the data size (excluding the index).
     262             : //
     263             : // B. The size of index blocks to decide when to start a new index block.
     264             : //
     265             : // There are some terminology peculiarities which are due to the origin of
     266             : // sizeEstimate for use case A with parallel compression enabled (for which
     267             : // the code has not been merged). Specifically this relates to the terms
     268             : // "written" and "compressed".
     269             : //   - The notion of "written" for case A is sufficiently defined by saying that
     270             : //     the data block is compressed. Waiting for the actual data block write to
     271             : //     happen can result in unnecessary estimation, when we already know how big
     272             : //     it will be in compressed form. Additionally, with the forthcoming value
     273             : //     blocks containing older MVCC values, these compressed block will be held
     274             : //     in-memory until late in the sstable writing, and we do want to accurately
     275             : //     account for them without waiting for the actual write.
     276             : //     For case B, "written" means that the index entry has been fully
     277             : //     generated, and has been added to the uncompressed block buffer for that
     278             : //     index block. It does not include actually writing a potentially
     279             : //     compressed index block.
     280             : //   - The notion of "compressed" is to differentiate between a "inflight" size
     281             : //     and the actual size, and is handled via computing a compression ratio
     282             : //     observed so far (defaults to 1).
     283             : //     For case A, this is actual data block compression, so the "inflight" size
     284             : //     is uncompressed blocks (that are no longer being written to) and the
     285             : //     "compressed" size is after they have been compressed.
     286             : //     For case B the inflight size is for a key-value pair in the index for
     287             : //     which the value size (the encoded size of the BlockHandleWithProperties)
     288             : //     is not accurately known, while the compressed size is the size of that
     289             : //     entry when it has been added to the (in-progress) index ssblock.
     290             : //
     291             : // Usage: To update state, one can optionally provide an inflight write value
     292             : // using addInflight (used for case B). When something is "written" the state
     293             : // can be updated using either writtenWithDelta or writtenWithTotal, which
     294             : // provide the actual delta size or the total size (latter must be
     295             : // monotonically non-decreasing). If there were no calls to addInflight, there
     296             : // isn't any real estimation happening here. So case A does not do any real
     297             : // estimation. However, when we introduce parallel compression, there will be
     298             : // estimation in that the client goroutine will call addInFlight and the
     299             : // compression goroutines will call writtenWithDelta.
     300             : type sizeEstimate struct {
     301             :         // emptySize is the size when there is no inflight data, and numEntries is 0.
     302             :         // emptySize is constant once set.
     303             :         emptySize uint64
     304             : 
     305             :         // inflightSize is the estimated size of some inflight data which hasn't
     306             :         // been written yet.
     307             :         inflightSize uint64
     308             : 
     309             :         // totalSize is the total size of the data which has already been written.
     310             :         totalSize uint64
     311             : 
     312             :         // numWrittenEntries is the total number of entries which have already been
     313             :         // written.
     314             :         numWrittenEntries uint64
     315             :         // numInflightEntries is the total number of entries which are inflight, and
     316             :         // haven't been written.
     317             :         numInflightEntries uint64
     318             : 
     319             :         // maxEstimatedSize stores the maximum result returned from sizeEstimate.size.
     320             :         // It ensures that values returned from subsequent calls to Writer.EstimatedSize
     321             :         // never decrease.
     322             :         maxEstimatedSize uint64
     323             : 
     324             :         // We assume that the entries added to the sizeEstimate can be compressed.
     325             :         // For this reason, we keep track of a compressedSize and an uncompressedSize
     326             :         // to compute a compression ratio for the inflight entries. If the entries
     327             :         // aren't being compressed, then compressedSize and uncompressedSize must be
     328             :         // equal.
     329             :         compressedSize   uint64
     330             :         uncompressedSize uint64
     331             : }
     332             : 
     333           1 : func (s *sizeEstimate) init(emptySize uint64) {
     334           1 :         s.emptySize = emptySize
     335           1 : }
     336             : 
     337           1 : func (s *sizeEstimate) size() uint64 {
     338           1 :         ratio := float64(1)
     339           1 :         if s.uncompressedSize > 0 {
     340           1 :                 ratio = float64(s.compressedSize) / float64(s.uncompressedSize)
     341           1 :         }
     342           1 :         estimatedInflightSize := uint64(float64(s.inflightSize) * ratio)
     343           1 :         total := s.totalSize + estimatedInflightSize
     344           1 :         if total > s.maxEstimatedSize {
     345           1 :                 s.maxEstimatedSize = total
     346           1 :         } else {
     347           1 :                 total = s.maxEstimatedSize
     348           1 :         }
     349             : 
     350           1 :         if total == 0 {
     351           1 :                 return s.emptySize
     352           1 :         }
     353             : 
     354           1 :         return total
     355             : }
     356             : 
     357           1 : func (s *sizeEstimate) numTotalEntries() uint64 {
     358           1 :         return s.numWrittenEntries + s.numInflightEntries
     359           1 : }
     360             : 
     361           1 : func (s *sizeEstimate) addInflight(size int) {
     362           1 :         s.numInflightEntries++
     363           1 :         s.inflightSize += uint64(size)
     364           1 : }
     365             : 
     366           1 : func (s *sizeEstimate) writtenWithTotal(newTotalSize uint64, inflightSize int) {
     367           1 :         finalEntrySize := int(newTotalSize - s.totalSize)
     368           1 :         s.writtenWithDelta(finalEntrySize, inflightSize)
     369           1 : }
     370             : 
     371           1 : func (s *sizeEstimate) writtenWithDelta(finalEntrySize int, inflightSize int) {
     372           1 :         if inflightSize > 0 {
     373           1 :                 // This entry was previously inflight, so we should decrement inflight
     374           1 :                 // entries and update the "compression" stats for future estimation.
     375           1 :                 s.numInflightEntries--
     376           1 :                 s.inflightSize -= uint64(inflightSize)
     377           1 :                 s.uncompressedSize += uint64(inflightSize)
     378           1 :                 s.compressedSize += uint64(finalEntrySize)
     379           1 :         }
     380           1 :         s.numWrittenEntries++
     381           1 :         s.totalSize += uint64(finalEntrySize)
     382             : }
     383             : 
     384           1 : func (s *sizeEstimate) clear() {
     385           1 :         *s = sizeEstimate{emptySize: s.emptySize}
     386           1 : }
     387             : 
     388             : type indexBlockBuf struct {
     389             :         // block will only be accessed from the writeQueue.
     390             :         block rowblk.Writer
     391             : 
     392             :         size struct {
     393             :                 useMutex bool
     394             :                 mu       sync.Mutex
     395             :                 estimate sizeEstimate
     396             :         }
     397             : 
     398             :         // restartInterval matches indexBlockBuf.block.restartInterval. We store it twice, because the `block`
     399             :         // must only be accessed from the writeQueue goroutine.
     400             :         restartInterval int
     401             : }
     402             : 
     403           1 : func (i *indexBlockBuf) clear() {
     404           1 :         i.block.Reset()
     405           1 :         if i.size.useMutex {
     406           1 :                 i.size.mu.Lock()
     407           1 :                 defer i.size.mu.Unlock()
     408           1 :         }
     409           1 :         i.size.estimate.clear()
     410           1 :         i.restartInterval = 0
     411             : }
     412             : 
     413             : var indexBlockBufPool = sync.Pool{
     414           1 :         New: func() interface{} {
     415           1 :                 return &indexBlockBuf{}
     416           1 :         },
     417             : }
     418             : 
     419             : const indexBlockRestartInterval = 1
     420             : 
     421           1 : func newIndexBlockBuf(useMutex bool) *indexBlockBuf {
     422           1 :         i := indexBlockBufPool.Get().(*indexBlockBuf)
     423           1 :         i.size.useMutex = useMutex
     424           1 :         i.restartInterval = indexBlockRestartInterval
     425           1 :         i.block.RestartInterval = indexBlockRestartInterval
     426           1 :         i.size.estimate.init(rowblk.EmptySize)
     427           1 :         return i
     428           1 : }
     429             : 
     430             : func (i *indexBlockBuf) shouldFlush(
     431             :         sep InternalKey, valueLen int, flushOptions flushDecisionOptions, sizeClassHints []int,
     432           1 : ) bool {
     433           1 :         if i.size.useMutex {
     434           1 :                 i.size.mu.Lock()
     435           1 :                 defer i.size.mu.Unlock()
     436           1 :         }
     437             : 
     438           1 :         nEntries := i.size.estimate.numTotalEntries()
     439           1 :         return shouldFlushWithHints(
     440           1 :                 sep.Size(), valueLen, i.restartInterval, int(i.size.estimate.size()),
     441           1 :                 int(nEntries), flushOptions, sizeClassHints)
     442             : }
     443             : 
     444           1 : func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) {
     445           1 :         i.block.Add(key, value)
     446           1 :         size := i.block.EstimatedSize()
     447           1 :         if i.size.useMutex {
     448           1 :                 i.size.mu.Lock()
     449           1 :                 defer i.size.mu.Unlock()
     450           1 :         }
     451           1 :         i.size.estimate.writtenWithTotal(uint64(size), inflightSize)
     452             : }
     453             : 
     454           1 : func (i *indexBlockBuf) finish() []byte {
     455           1 :         b := i.block.Finish()
     456           1 :         return b
     457           1 : }
     458             : 
     459           1 : func (i *indexBlockBuf) addInflight(inflightSize int) {
     460           1 :         if i.size.useMutex {
     461           1 :                 i.size.mu.Lock()
     462           1 :                 defer i.size.mu.Unlock()
     463           1 :         }
     464           1 :         i.size.estimate.addInflight(inflightSize)
     465             : }
     466             : 
     467           1 : func (i *indexBlockBuf) estimatedSize() uint64 {
     468           1 :         if i.size.useMutex {
     469           1 :                 i.size.mu.Lock()
     470           1 :                 defer i.size.mu.Unlock()
     471           1 :         }
     472             : 
     473             :         // Make sure that the size estimation works as expected when parallelism
     474             :         // is disabled.
     475           1 :         if invariants.Enabled && !i.size.useMutex {
     476           1 :                 if i.size.estimate.inflightSize != 0 {
     477           0 :                         panic("unexpected inflight entry in index block size estimation")
     478             :                 }
     479             : 
     480             :                 // NB: The i.block should only be accessed from the writeQueue goroutine,
     481             :                 // when parallelism is enabled. We break that invariant here, but that's
     482             :                 // okay since parallelism is disabled.
     483           1 :                 if i.size.estimate.size() != uint64(i.block.EstimatedSize()) {
     484           0 :                         panic("index block size estimation sans parallelism is incorrect")
     485             :                 }
     486             :         }
     487           1 :         return i.size.estimate.size()
     488             : }
     489             : 
     490             : // sizeEstimate is used for sstable size estimation. sizeEstimate can be
     491             : // accessed by the Writer client and compressionQueue goroutines. Fields
     492             : // should only be read/updated through the functions defined on the
     493             : // *sizeEstimate type.
     494             : type dataBlockEstimates struct {
     495             :         // If we don't do block compression in parallel, then we don't need to take
     496             :         // the performance hit of synchronizing using this mutex.
     497             :         useMutex bool
     498             :         mu       sync.Mutex
     499             : 
     500             :         estimate sizeEstimate
     501             : }
     502             : 
     503             : // inflightSize is the uncompressed block size estimate which has been
     504             : // previously provided to addInflightDataBlock(). If addInflightDataBlock()
     505             : // has not been called, this must be set to 0. compressedSize is the
     506             : // compressed size of the block.
     507           1 : func (d *dataBlockEstimates) dataBlockCompressed(compressedSize int, inflightSize int) {
     508           1 :         if d.useMutex {
     509           0 :                 d.mu.Lock()
     510           0 :                 defer d.mu.Unlock()
     511           0 :         }
     512           1 :         d.estimate.writtenWithDelta(compressedSize+block.TrailerLen, inflightSize)
     513             : }
     514             : 
     515             : // size is an estimated size of datablock data which has been written to disk.
     516           1 : func (d *dataBlockEstimates) size() uint64 {
     517           1 :         if d.useMutex {
     518           0 :                 d.mu.Lock()
     519           0 :                 defer d.mu.Unlock()
     520           0 :         }
     521             :         // If there is no parallel compression, there should not be any inflight bytes.
     522           1 :         if invariants.Enabled && !d.useMutex {
     523           1 :                 if d.estimate.inflightSize != 0 {
     524           0 :                         panic("unexpected inflight entry in data block size estimation")
     525             :                 }
     526             :         }
     527           1 :         return d.estimate.size()
     528             : }
     529             : 
     530             : // Avoid linter unused error.
     531             : var _ = (&dataBlockEstimates{}).addInflightDataBlock
     532             : 
     533             : // NB: unused since no parallel compression.
     534           0 : func (d *dataBlockEstimates) addInflightDataBlock(size int) {
     535           0 :         if d.useMutex {
     536           0 :                 d.mu.Lock()
     537           0 :                 defer d.mu.Unlock()
     538           0 :         }
     539             : 
     540           0 :         d.estimate.addInflight(size)
     541             : }
     542             : 
     543             : var writeTaskPool = sync.Pool{
     544           1 :         New: func() interface{} {
     545           1 :                 t := &writeTask{}
     546           1 :                 t.compressionDone = make(chan bool, 1)
     547           1 :                 return t
     548           1 :         },
     549             : }
     550             : 
     551             : type blockBuf struct {
     552             :         // tmp is a scratch buffer, large enough to hold either footerLen bytes,
     553             :         // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most
     554             :         // likely large enough for a block handle with properties.
     555             :         tmp [blockHandleLikelyMaxLen]byte
     556             :         // compressedBuf is the destination buffer for compression. It is re-used over the
     557             :         // lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block.
     558             :         compressedBuf []byte
     559             :         checksummer   block.Checksummer
     560             : }
     561             : 
     562           1 : func (b *blockBuf) clear() {
     563           1 :         // We can't assign b.compressedBuf[:0] to compressedBuf because snappy relies
     564           1 :         // on the length of the buffer, and not the capacity to determine if it needs
     565           1 :         // to make an allocation.
     566           1 :         *b = blockBuf{
     567           1 :                 compressedBuf: b.compressedBuf, checksummer: b.checksummer,
     568           1 :         }
     569           1 : }
     570             : 
     571             : // A dataBlockBuf holds all the state required to compress and write a data block to disk.
     572             : // A dataBlockBuf begins its lifecycle owned by the Writer client goroutine. The Writer
     573             : // client goroutine adds keys to the sstable, writing directly into a dataBlockBuf's blockWriter
     574             : // until the block is full. Once a dataBlockBuf's block is full, the dataBlockBuf may be passed
     575             : // to other goroutines for compression and file I/O.
     576             : type dataBlockBuf struct {
     577             :         blockBuf
     578             :         dataBlock rowblk.Writer
     579             : 
     580             :         // uncompressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
     581             :         // next byte slice to be compressed. The uncompressed byte slice will be backed by the
     582             :         // dataBlock.buf.
     583             :         uncompressed []byte
     584             :         // compressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
     585             :         // compressed byte slice which must be written to disk. The compressed byte slice may be
     586             :         // backed by the dataBlock.buf, or the dataBlockBuf.compressedBuf, depending on whether
     587             :         // we use the result of the compression.
     588             :         compressed []byte
     589             :         // trailer is the block trailer encoding the compression type and checksum.
     590             :         trailer block.Trailer
     591             : 
     592             :         // We're making calls to BlockPropertyCollectors from the Writer client goroutine. We need to
     593             :         // pass the encoded block properties over to the write queue. To prevent copies, and allocations,
     594             :         // we give each dataBlockBuf, a blockPropertiesEncoder.
     595             :         blockPropsEncoder blockPropertiesEncoder
     596             :         // dataBlockProps is set when Writer.finishDataBlockProps is called. The dataBlockProps slice is
     597             :         // a shallow copy of the internal buffer of the dataBlockBuf.blockPropsEncoder.
     598             :         dataBlockProps []byte
     599             : 
     600             :         // sepScratch is reusable scratch space for computing separator keys.
     601             :         sepScratch []byte
     602             : }
     603             : 
     604           1 : func (d *dataBlockBuf) clear() {
     605           1 :         d.blockBuf.clear()
     606           1 :         d.dataBlock.Reset()
     607           1 : 
     608           1 :         d.uncompressed = nil
     609           1 :         d.compressed = nil
     610           1 :         d.dataBlockProps = nil
     611           1 :         d.sepScratch = d.sepScratch[:0]
     612           1 : }
     613             : 
     614             : var dataBlockBufPool = sync.Pool{
     615           1 :         New: func() interface{} {
     616           1 :                 return &dataBlockBuf{}
     617           1 :         },
     618             : }
     619             : 
     620           1 : func newDataBlockBuf(restartInterval int, checksumType block.ChecksumType) *dataBlockBuf {
     621           1 :         d := dataBlockBufPool.Get().(*dataBlockBuf)
     622           1 :         d.dataBlock.RestartInterval = restartInterval
     623           1 :         d.checksummer.Type = checksumType
     624           1 :         return d
     625           1 : }
     626             : 
     627           1 : func (d *dataBlockBuf) finish() {
     628           1 :         d.uncompressed = d.dataBlock.Finish()
     629           1 : }
     630             : 
     631           1 : func (d *dataBlockBuf) compressAndChecksum(c Compression) {
     632           1 :         d.compressed, d.trailer = compressAndChecksum(d.uncompressed, c, &d.blockBuf)
     633           1 : }
     634             : 
     635             : func (d *dataBlockBuf) shouldFlush(
     636             :         key InternalKey, valueLen int, flushOptions flushDecisionOptions, sizeClassHints []int,
     637           1 : ) bool {
     638           1 :         return shouldFlushWithHints(
     639           1 :                 key.Size(), valueLen, d.dataBlock.RestartInterval, d.dataBlock.EstimatedSize(),
     640           1 :                 d.dataBlock.EntryCount(), flushOptions, sizeClassHints)
     641           1 : }
     642             : 
     643             : type indexBlockAndBlockProperties struct {
     644             :         nEntries int
     645             :         // sep is the last key added to this block, for computing a separator later.
     646             :         sep        InternalKey
     647             :         properties []byte
     648             :         // block is the encoded block produced by blockWriter.finish.
     649             :         block []byte
     650             : }
     651             : 
     652             : // Set sets the value for the given key. The sequence number is set to 0.
     653             : // Intended for use to externally construct an sstable before ingestion into a
     654             : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
     655             : // order.
     656             : //
     657             : // TODO(peter): untested
     658           1 : func (w *Writer) Set(key, value []byte) error {
     659           1 :         if w.err != nil {
     660           0 :                 return w.err
     661           0 :         }
     662           1 :         if w.isStrictObsolete {
     663           0 :                 return errors.Errorf("use AddWithForceObsolete")
     664           0 :         }
     665             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     666             :         // sstable delete the added points.
     667           1 :         return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
     668             : }
     669             : 
     670             : // Delete deletes the value for the given key. The sequence number is set to
     671             : // 0. Intended for use to externally construct an sstable before ingestion into
     672             : // a DB.
     673             : //
     674             : // TODO(peter): untested
     675           1 : func (w *Writer) Delete(key []byte) error {
     676           1 :         if w.err != nil {
     677           0 :                 return w.err
     678           0 :         }
     679           1 :         if w.isStrictObsolete {
     680           0 :                 return errors.Errorf("use AddWithForceObsolete")
     681           0 :         }
     682             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     683             :         // sstable delete the added points.
     684           1 :         return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
     685             : }
     686             : 
     687             : // DeleteRange deletes all of the keys (and values) in the range [start,end)
     688             : // (inclusive on start, exclusive on end). The sequence number is set to
     689             : // 0. Intended for use to externally construct an sstable before ingestion into
     690             : // a DB.
     691             : //
     692             : // TODO(peter): untested
     693           1 : func (w *Writer) DeleteRange(start, end []byte) error {
     694           1 :         if w.err != nil {
     695           0 :                 return w.err
     696           0 :         }
     697           1 :         return w.addTombstone(base.MakeInternalKey(start, 0, InternalKeyKindRangeDelete), end)
     698             : }
     699             : 
     700             : // Merge adds an action to the DB that merges the value at key with the new
     701             : // value. The details of the merge are dependent upon the configured merge
     702             : // operator. The sequence number is set to 0. Intended for use to externally
     703             : // construct an sstable before ingestion into a DB.
     704             : //
     705             : // TODO(peter): untested
     706           0 : func (w *Writer) Merge(key, value []byte) error {
     707           0 :         if w.err != nil {
     708           0 :                 return w.err
     709           0 :         }
     710           0 :         if w.isStrictObsolete {
     711           0 :                 return errors.Errorf("use AddWithForceObsolete")
     712           0 :         }
     713             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     714             :         // sstable that delete the added points. If the user configured this writer
     715             :         // to be strict-obsolete, addPoint will reject the addition of this MERGE.
     716           0 :         return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
     717             : }
     718             : 
     719             : // Add adds a key/value pair to the table being written. For a given Writer,
     720             : // the keys passed to Add must be in increasing order. The exception to this
     721             : // rule is range deletion tombstones. Range deletion tombstones need to be
     722             : // added ordered by their start key, but they can be added out of order from
     723             : // point entries. Additionally, range deletion tombstones must be fragmented
     724             : // (i.e. by keyspan.Fragmenter).
     725           1 : func (w *Writer) Add(key InternalKey, value []byte) error {
     726           1 :         if w.isStrictObsolete {
     727           0 :                 return errors.Errorf("use AddWithForceObsolete")
     728           0 :         }
     729           1 :         return w.AddWithForceObsolete(key, value, false)
     730             : }
     731             : 
     732             : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
     733             : //
     734             : // forceObsolete indicates whether the caller has determined that this key is
     735             : // obsolete even though it may be the latest point key for this userkey. This
     736             : // should be set to true for keys obsoleted by RANGEDELs, and is required for
     737             : // strict-obsolete sstables.
     738             : //
     739             : // Note that there are two properties, S1 and S2 (see comment in format.go)
     740             : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
     741             : // responsibility of the caller. S1 is solely the responsibility of the
     742             : // callee.
     743           1 : func (w *Writer) AddWithForceObsolete(key InternalKey, value []byte, forceObsolete bool) error {
     744           1 :         if w.err != nil {
     745           0 :                 return w.err
     746           0 :         }
     747             : 
     748           1 :         switch key.Kind() {
     749           1 :         case InternalKeyKindRangeDelete:
     750           1 :                 return w.addTombstone(key, value)
     751             :         case base.InternalKeyKindRangeKeyDelete,
     752             :                 base.InternalKeyKindRangeKeySet,
     753           0 :                 base.InternalKeyKindRangeKeyUnset:
     754           0 :                 w.err = errors.Errorf(
     755           0 :                         "pebble: range keys must be added via one of the RangeKey* functions")
     756           0 :                 return w.err
     757             :         }
     758           1 :         return w.addPoint(key, value, forceObsolete)
     759             : }
     760             : 
     761           1 : func (w *Writer) makeAddPointDecisionV2(key InternalKey) error {
     762           1 :         prevTrailer := w.lastPointKeyInfo.trailer
     763           1 :         w.lastPointKeyInfo.trailer = key.Trailer
     764           1 :         if w.dataBlockBuf.dataBlock.EntryCount() == 0 {
     765           1 :                 return nil
     766           1 :         }
     767           1 :         if !w.disableKeyOrderChecks {
     768           1 :                 prevPointUserKey := w.dataBlockBuf.dataBlock.CurUserKey()
     769           1 :                 cmpUser := w.compare(prevPointUserKey, key.UserKey)
     770           1 :                 if cmpUser > 0 || (cmpUser == 0 && prevTrailer <= key.Trailer) {
     771           1 :                         return errors.Errorf(
     772           1 :                                 "pebble: keys must be added in strictly increasing order: %s, %s",
     773           1 :                                 InternalKey{UserKey: prevPointUserKey, Trailer: prevTrailer}.Pretty(w.formatKey),
     774           1 :                                 key.Pretty(w.formatKey))
     775           1 :                 }
     776             :         }
     777           1 :         return nil
     778             : }
     779             : 
     780             : // REQUIRES: at least one point has been written to the Writer.
     781           1 : func (w *Writer) getLastPointUserKey() []byte {
     782           1 :         if w.dataBlockBuf.dataBlock.EntryCount() == 0 {
     783           0 :                 panic(errors.AssertionFailedf("no point keys added to writer"))
     784             :         }
     785           1 :         return w.dataBlockBuf.dataBlock.CurUserKey()
     786             : }
     787             : 
     788             : // REQUIRES: w.tableFormat >= TableFormatPebblev3
     789             : func (w *Writer) makeAddPointDecisionV3(
     790             :         key InternalKey, valueLen int,
     791           1 : ) (setHasSamePrefix bool, writeToValueBlock bool, isObsolete bool, err error) {
     792           1 :         prevPointKeyInfo := w.lastPointKeyInfo
     793           1 :         w.lastPointKeyInfo.userKeyLen = len(key.UserKey)
     794           1 :         w.lastPointKeyInfo.prefixLen = w.split(key.UserKey)
     795           1 :         w.lastPointKeyInfo.trailer = key.Trailer
     796           1 :         w.lastPointKeyInfo.isObsolete = false
     797           1 :         if !w.meta.HasPointKeys {
     798           1 :                 return false, false, false, nil
     799           1 :         }
     800           1 :         keyKind := key.Trailer.Kind()
     801           1 :         prevPointUserKey := w.getLastPointUserKey()
     802           1 :         prevPointKey := InternalKey{UserKey: prevPointUserKey, Trailer: prevPointKeyInfo.trailer}
     803           1 :         prevKeyKind := prevPointKeyInfo.trailer.Kind()
     804           1 :         considerWriteToValueBlock := prevKeyKind == InternalKeyKindSet &&
     805           1 :                 keyKind == InternalKeyKindSet
     806           1 :         if considerWriteToValueBlock && !w.requiredInPlaceValueBound.IsEmpty() {
     807           1 :                 keyPrefix := key.UserKey[:w.lastPointKeyInfo.prefixLen]
     808           1 :                 cmpUpper := w.compare(
     809           1 :                         w.requiredInPlaceValueBound.Upper, keyPrefix)
     810           1 :                 if cmpUpper <= 0 {
     811           1 :                         // Common case for CockroachDB. Make it empty since all future keys in
     812           1 :                         // this sstable will also have cmpUpper <= 0.
     813           1 :                         w.requiredInPlaceValueBound = UserKeyPrefixBound{}
     814           1 :                 } else if w.compare(keyPrefix, w.requiredInPlaceValueBound.Lower) >= 0 {
     815           1 :                         considerWriteToValueBlock = false
     816           1 :                 }
     817             :         }
     818             :         // cmpPrefix is initialized iff considerWriteToValueBlock.
     819           1 :         var cmpPrefix int
     820           1 :         var cmpUser int
     821           1 :         if considerWriteToValueBlock {
     822           1 :                 // Compare the prefixes.
     823           1 :                 cmpPrefix = w.compare(prevPointUserKey[:prevPointKeyInfo.prefixLen],
     824           1 :                         key.UserKey[:w.lastPointKeyInfo.prefixLen])
     825           1 :                 cmpUser = cmpPrefix
     826           1 :                 if cmpPrefix == 0 {
     827           1 :                         // Need to compare suffixes to compute cmpUser.
     828           1 :                         cmpUser = w.compare(prevPointUserKey[prevPointKeyInfo.prefixLen:],
     829           1 :                                 key.UserKey[w.lastPointKeyInfo.prefixLen:])
     830           1 :                 }
     831           1 :         } else {
     832           1 :                 cmpUser = w.compare(prevPointUserKey, key.UserKey)
     833           1 :         }
     834             :         // Ensure that no one adds a point key kind without considering the obsolete
     835             :         // handling for that kind.
     836           1 :         switch keyKind {
     837             :         case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
     838           1 :                 InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     839           0 :         default:
     840           0 :                 panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
     841             :         }
     842             :         // If same user key, then the current key is obsolete if any of the
     843             :         // following is true:
     844             :         // C1 The prev key was obsolete.
     845             :         // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
     846             :         //    preserve SET* and MERGE since their values will be merged into the
     847             :         //    previous key. We also must preserve DEL* since there may be an older
     848             :         //    SET*/MERGE in a lower level that must not be merged with the MERGE --
     849             :         //    if we omit the DEL* that lower SET*/MERGE will become visible.
     850             :         //
     851             :         // Regardless of whether it is the same user key or not
     852             :         // C3 The current key is some kind of point delete, and we are writing to
     853             :         //    the lowest level, then it is also obsolete. The correctness of this
     854             :         //    relies on the same user key not spanning multiple sstables in a level.
     855             :         //
     856             :         // C1 ensures that for a user key there is at most one transition from
     857             :         // !obsolete to obsolete. Consider a user key k, for which the first n keys
     858             :         // are not obsolete. We consider the various value of n:
     859             :         //
     860             :         // n = 0: This happens due to forceObsolete being set by the caller, or due
     861             :         // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
     862             :         // must also delete all the lower seqnums for the same user key. C3 triggers
     863             :         // due to a point delete and that deletes all the lower seqnums for the same
     864             :         // user key.
     865             :         //
     866             :         // n = 1: This is the common case. It happens when the first key is not a
     867             :         // MERGE, or the current key is some kind of point delete.
     868             :         //
     869             :         // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
     870             :         // single non-MERGE key.
     871           1 :         isObsoleteC1AndC2 := cmpUser == 0 &&
     872           1 :                 (prevPointKeyInfo.isObsolete || prevKeyKind != InternalKeyKindMerge)
     873           1 :         isObsoleteC3 := w.writingToLowestLevel &&
     874           1 :                 (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
     875           1 :                         keyKind == InternalKeyKindDeleteSized)
     876           1 :         isObsolete = isObsoleteC1AndC2 || isObsoleteC3
     877           1 :         // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
     878           1 :         // possible, but requires some care in documenting and checking invariants.
     879           1 :         // There is code that assumes nothing in value blocks because of single MVCC
     880           1 :         // version (those should be ok). We have to ensure setHasSamePrefix is
     881           1 :         // correctly initialized here etc.
     882           1 : 
     883           1 :         if !w.disableKeyOrderChecks &&
     884           1 :                 (cmpUser > 0 || (cmpUser == 0 && prevPointKeyInfo.trailer <= key.Trailer)) {
     885           1 :                 return false, false, false, errors.Errorf(
     886           1 :                         "pebble: keys must be added in strictly increasing order: %s, %s",
     887           1 :                         prevPointKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
     888           1 :         }
     889           1 :         if !considerWriteToValueBlock {
     890           1 :                 return false, false, isObsolete, nil
     891           1 :         }
     892             :         // NB: it is possible that cmpUser == 0, i.e., these two SETs have identical
     893             :         // user keys (because of an open snapshot). This should be the rare case.
     894           1 :         setHasSamePrefix = cmpPrefix == 0
     895           1 :         // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
     896           1 :         // valueHandle, this should be > 3. But tiny values are common in test and
     897           1 :         // unlikely in production, so we use 0 here for better test coverage.
     898           1 :         const tinyValueThreshold = 0
     899           1 :         // NB: setting WriterOptions.DisableValueBlocks does not disable the
     900           1 :         // setHasSamePrefix optimization.
     901           1 :         considerWriteToValueBlock = setHasSamePrefix && valueLen > tinyValueThreshold && w.valueBlockWriter != nil
     902           1 :         return setHasSamePrefix, considerWriteToValueBlock, isObsolete, nil
     903             : }
     904             : 
     905           1 : func (w *Writer) addPoint(key InternalKey, value []byte, forceObsolete bool) error {
     906           1 :         if w.isStrictObsolete && key.Kind() == InternalKeyKindMerge {
     907           1 :                 return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
     908           1 :         }
     909           1 :         var err error
     910           1 :         var setHasSameKeyPrefix, writeToValueBlock, addPrefixToValueStoredWithKey bool
     911           1 :         var isObsolete bool
     912           1 :         maxSharedKeyLen := len(key.UserKey)
     913           1 :         if w.tableFormat >= TableFormatPebblev3 {
     914           1 :                 // maxSharedKeyLen is limited to the prefix of the preceding key. If the
     915           1 :                 // preceding key was in a different block, then the blockWriter will
     916           1 :                 // ignore this maxSharedKeyLen.
     917           1 :                 maxSharedKeyLen = w.lastPointKeyInfo.prefixLen
     918           1 :                 setHasSameKeyPrefix, writeToValueBlock, isObsolete, err =
     919           1 :                         w.makeAddPointDecisionV3(key, len(value))
     920           1 :                 addPrefixToValueStoredWithKey = key.Kind() == InternalKeyKindSet
     921           1 :         } else {
     922           1 :                 err = w.makeAddPointDecisionV2(key)
     923           1 :         }
     924           1 :         if err != nil {
     925           1 :                 return err
     926           1 :         }
     927           1 :         isObsolete = w.tableFormat >= TableFormatPebblev4 && (isObsolete || forceObsolete)
     928           1 :         w.lastPointKeyInfo.isObsolete = isObsolete
     929           1 :         var valueStoredWithKey []byte
     930           1 :         var prefix block.ValuePrefix
     931           1 :         var valueStoredWithKeyLen int
     932           1 :         if writeToValueBlock {
     933           1 :                 vh, err := w.valueBlockWriter.addValue(value)
     934           1 :                 if err != nil {
     935           0 :                         return err
     936           0 :                 }
     937           1 :                 n := encodeValueHandle(w.blockBuf.tmp[:], vh)
     938           1 :                 valueStoredWithKey = w.blockBuf.tmp[:n]
     939           1 :                 valueStoredWithKeyLen = len(valueStoredWithKey) + 1
     940           1 :                 var attribute base.ShortAttribute
     941           1 :                 if w.shortAttributeExtractor != nil {
     942           1 :                         // TODO(sumeer): for compactions, it is possible that the input sstable
     943           1 :                         // already has this value in the value section and so we have already
     944           1 :                         // extracted the ShortAttribute. Avoid extracting it again. This will
     945           1 :                         // require changing the Writer.Add interface.
     946           1 :                         if attribute, err = w.shortAttributeExtractor(
     947           1 :                                 key.UserKey, w.lastPointKeyInfo.prefixLen, value); err != nil {
     948           0 :                                 return err
     949           0 :                         }
     950             :                 }
     951           1 :                 prefix = block.ValueHandlePrefix(setHasSameKeyPrefix, attribute)
     952           1 :         } else {
     953           1 :                 valueStoredWithKey = value
     954           1 :                 valueStoredWithKeyLen = len(value)
     955           1 :                 if addPrefixToValueStoredWithKey {
     956           1 :                         valueStoredWithKeyLen++
     957           1 :                 }
     958           1 :                 prefix = block.InPlaceValuePrefix(setHasSameKeyPrefix)
     959             :         }
     960             : 
     961           1 :         if err := w.maybeFlush(key, valueStoredWithKeyLen); err != nil {
     962           1 :                 return err
     963           1 :         }
     964             : 
     965           1 :         for i := range w.blockPropCollectors {
     966           1 :                 v := value
     967           1 :                 if addPrefixToValueStoredWithKey {
     968           1 :                         // Values for SET are not required to be in-place, and in the future may
     969           1 :                         // not even be read by the compaction, so pass nil values. Block
     970           1 :                         // property collectors in such Pebble DB's must not look at the value.
     971           1 :                         v = nil
     972           1 :                 }
     973           1 :                 if err := w.blockPropCollectors[i].Add(key, v); err != nil {
     974           1 :                         w.err = err
     975           1 :                         return err
     976           1 :                 }
     977             :         }
     978           1 :         if w.tableFormat >= TableFormatPebblev4 {
     979           1 :                 w.obsoleteCollector.AddPoint(isObsolete)
     980           1 :         }
     981             : 
     982           1 :         w.maybeAddToFilter(key.UserKey)
     983           1 :         w.dataBlockBuf.dataBlock.AddWithOptionalValuePrefix(
     984           1 :                 key, isObsolete, valueStoredWithKey, maxSharedKeyLen, addPrefixToValueStoredWithKey, prefix,
     985           1 :                 setHasSameKeyPrefix)
     986           1 : 
     987           1 :         w.meta.updateSeqNum(key.SeqNum())
     988           1 : 
     989           1 :         if !w.meta.HasPointKeys {
     990           1 :                 k := w.dataBlockBuf.dataBlock.CurKey()
     991           1 :                 // NB: We need to ensure that SmallestPoint.UserKey is set, so we create
     992           1 :                 // an InternalKey which is semantically identical to the key, but won't
     993           1 :                 // have a nil UserKey. We do this, because key.UserKey could be nil, and
     994           1 :                 // we don't want SmallestPoint.UserKey to be nil.
     995           1 :                 //
     996           1 :                 // todo(bananabrick): Determine if it's okay to have a nil SmallestPoint
     997           1 :                 // .UserKey now that we don't rely on a nil UserKey to determine if the
     998           1 :                 // key has been set or not.
     999           1 :                 w.meta.SetSmallestPointKey(k.Clone())
    1000           1 :         }
    1001             : 
    1002           1 :         w.props.NumEntries++
    1003           1 :         switch key.Kind() {
    1004           1 :         case InternalKeyKindDelete, InternalKeyKindSingleDelete:
    1005           1 :                 w.props.NumDeletions++
    1006           1 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
    1007           1 :         case InternalKeyKindDeleteSized:
    1008           1 :                 var size uint64
    1009           1 :                 if len(value) > 0 {
    1010           1 :                         var n int
    1011           1 :                         size, n = binary.Uvarint(value)
    1012           1 :                         if n <= 0 {
    1013           0 :                                 w.err = errors.Newf("%s key's value (%x) does not parse as uvarint",
    1014           0 :                                         errors.Safe(key.Kind().String()), value)
    1015           0 :                                 return w.err
    1016           0 :                         }
    1017             :                 }
    1018           1 :                 w.props.NumDeletions++
    1019           1 :                 w.props.NumSizedDeletions++
    1020           1 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
    1021           1 :                 w.props.RawPointTombstoneValueSize += size
    1022           1 :         case InternalKeyKindMerge:
    1023           1 :                 w.props.NumMergeOperands++
    1024             :         }
    1025           1 :         w.props.RawKeySize += uint64(key.Size())
    1026           1 :         w.props.RawValueSize += uint64(len(value))
    1027           1 :         return nil
    1028             : }
    1029             : 
    1030           1 : func (w *Writer) prettyTombstone(k InternalKey, value []byte) fmt.Formatter {
    1031           1 :         return keyspan.Span{
    1032           1 :                 Start: k.UserKey,
    1033           1 :                 End:   value,
    1034           1 :                 Keys:  []keyspan.Key{{Trailer: k.Trailer}},
    1035           1 :         }.Pretty(w.formatKey)
    1036           1 : }
    1037             : 
    1038           1 : func (w *Writer) addTombstone(key InternalKey, value []byte) error {
    1039           1 :         if !w.disableKeyOrderChecks && w.rangeDelBlock.EntryCount() > 0 {
    1040           1 :                 // Check that tombstones are being added in fragmented order. If the two
    1041           1 :                 // tombstones overlap, their start and end keys must be identical.
    1042           1 :                 prevKey := w.rangeDelBlock.CurKey()
    1043           1 :                 switch c := w.compare(prevKey.UserKey, key.UserKey); {
    1044           0 :                 case c > 0:
    1045           0 :                         w.err = errors.Errorf("pebble: keys must be added in order: %s, %s",
    1046           0 :                                 prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
    1047           0 :                         return w.err
    1048           1 :                 case c == 0:
    1049           1 :                         prevValue := w.rangeDelBlock.CurValue()
    1050           1 :                         if w.compare(prevValue, value) != 0 {
    1051           1 :                                 w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
    1052           1 :                                         w.prettyTombstone(prevKey, prevValue),
    1053           1 :                                         w.prettyTombstone(key, value))
    1054           1 :                                 return w.err
    1055           1 :                         }
    1056           1 :                         if prevKey.SeqNum() <= key.SeqNum() {
    1057           1 :                                 w.err = errors.Errorf("pebble: keys must be added in strictly increasing order: %s, %s",
    1058           1 :                                         prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
    1059           1 :                                 return w.err
    1060           1 :                         }
    1061           1 :                 default:
    1062           1 :                         prevValue := w.rangeDelBlock.CurValue()
    1063           1 :                         if w.compare(prevValue, key.UserKey) > 0 {
    1064           1 :                                 w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
    1065           1 :                                         w.prettyTombstone(prevKey, prevValue),
    1066           1 :                                         w.prettyTombstone(key, value))
    1067           1 :                                 return w.err
    1068           1 :                         }
    1069             :                 }
    1070             :         }
    1071             : 
    1072           1 :         if key.Trailer == base.InternalKeyRangeDeleteSentinel {
    1073           0 :                 w.err = errors.Errorf("pebble: cannot add range delete sentinel: %s", key.Pretty(w.formatKey))
    1074           0 :                 return w.err
    1075           0 :         }
    1076             : 
    1077           1 :         w.meta.updateSeqNum(key.SeqNum())
    1078           1 : 
    1079           1 :         // Range tombstones are fragmented in the v2 range deletion block format,
    1080           1 :         // so the start key of the first range tombstone added will be the smallest
    1081           1 :         // range tombstone key. The largest range tombstone key will be determined
    1082           1 :         // in Writer.Close() as the end key of the last range tombstone added.
    1083           1 :         if w.props.NumRangeDeletions == 0 {
    1084           1 :                 w.meta.SetSmallestRangeDelKey(key.Clone())
    1085           1 :         }
    1086             : 
    1087           1 :         w.props.NumEntries++
    1088           1 :         w.props.NumDeletions++
    1089           1 :         w.props.NumRangeDeletions++
    1090           1 :         w.props.RawKeySize += uint64(key.Size())
    1091           1 :         w.props.RawValueSize += uint64(len(value))
    1092           1 :         w.rangeDelBlock.Add(key, value)
    1093           1 :         return nil
    1094             : }
    1095             : 
    1096             : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
    1097             : // the given suffix to the given value. The resulting range key is given the
    1098             : // sequence number zero, with the expectation that the resulting sstable will be
    1099             : // ingested.
    1100             : //
    1101             : // Keys must be added to the table in increasing order of start key. Spans are
    1102             : // not required to be fragmented. The same suffix may not be set or unset twice
    1103             : // over the same keyspan, because it would result in inconsistent state. Both
    1104             : // the Set and Unset would share the zero sequence number, and a key cannot be
    1105             : // both simultaneously set and unset.
    1106           1 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
    1107           1 :         return w.addRangeKeySpan(keyspan.Span{
    1108           1 :                 Start: w.tempRangeKeyCopy(start),
    1109           1 :                 End:   w.tempRangeKeyCopy(end),
    1110           1 :                 Keys: []keyspan.Key{
    1111           1 :                         {
    1112           1 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
    1113           1 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
    1114           1 :                                 Value:   w.tempRangeKeyCopy(value),
    1115           1 :                         },
    1116           1 :                 },
    1117           1 :         })
    1118           1 : }
    1119             : 
    1120             : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
    1121             : // with the given suffix. The resulting range key is given the
    1122             : // sequence number zero, with the expectation that the resulting sstable will be
    1123             : // ingested.
    1124             : //
    1125             : // Keys must be added to the table in increasing order of start key. Spans are
    1126             : // not required to be fragmented. The same suffix may not be set or unset twice
    1127             : // over the same keyspan, because it would result in inconsistent state. Both
    1128             : // the Set and Unset would share the zero sequence number, and a key cannot be
    1129             : // both simultaneously set and unset.
    1130           1 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
    1131           1 :         return w.addRangeKeySpan(keyspan.Span{
    1132           1 :                 Start: w.tempRangeKeyCopy(start),
    1133           1 :                 End:   w.tempRangeKeyCopy(end),
    1134           1 :                 Keys: []keyspan.Key{
    1135           1 :                         {
    1136           1 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
    1137           1 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
    1138           1 :                         },
    1139           1 :                 },
    1140           1 :         })
    1141           1 : }
    1142             : 
    1143             : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
    1144             : //
    1145             : // Keys must be added to the table in increasing order of start key. Spans are
    1146             : // not required to be fragmented.
    1147           1 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
    1148           1 :         return w.addRangeKeySpan(keyspan.Span{
    1149           1 :                 Start: w.tempRangeKeyCopy(start),
    1150           1 :                 End:   w.tempRangeKeyCopy(end),
    1151           1 :                 Keys: []keyspan.Key{
    1152           1 :                         {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
    1153           1 :                 },
    1154           1 :         })
    1155           1 : }
    1156             : 
    1157             : // AddRangeKey adds a range key set, unset, or delete key/value pair to the
    1158             : // table being written.
    1159             : //
    1160             : // Range keys must be supplied in strictly ascending order of start key (i.e.
    1161             : // user key ascending, sequence number descending, and key type descending).
    1162             : // Ranges added must also be supplied in fragmented span order - i.e. other than
    1163             : // spans that are perfectly aligned (same start and end keys), spans may not
    1164             : // overlap. Range keys may be added out of order relative to point keys and
    1165             : // range deletions.
    1166           1 : func (w *Writer) AddRangeKey(key InternalKey, value []byte) error {
    1167           1 :         if w.err != nil {
    1168           0 :                 return w.err
    1169           0 :         }
    1170           1 :         return w.addRangeKey(key, value)
    1171             : }
    1172             : 
    1173           1 : func (w *Writer) addRangeKeySpan(span keyspan.Span) error {
    1174           1 :         if w.compare(span.Start, span.End) >= 0 {
    1175           0 :                 return errors.Errorf(
    1176           0 :                         "pebble: start key must be strictly less than end key",
    1177           0 :                 )
    1178           0 :         }
    1179           1 :         if w.fragmenter.Start() != nil && w.compare(w.fragmenter.Start(), span.Start) > 0 {
    1180           1 :                 return errors.Errorf("pebble: spans must be added in order: %s > %s",
    1181           1 :                         w.formatKey(w.fragmenter.Start()), w.formatKey(span.Start))
    1182           1 :         }
    1183             :         // Add this span to the fragmenter.
    1184           1 :         w.fragmenter.Add(span)
    1185           1 :         return w.err
    1186             : }
    1187             : 
    1188           1 : func (w *Writer) encodeRangeKeySpan(span keyspan.Span) {
    1189           1 :         // This method is the emit function of the Fragmenter.
    1190           1 :         //
    1191           1 :         // NB: The span should only contain range keys and be internally consistent
    1192           1 :         // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
    1193           1 :         //
    1194           1 :         // We use w.rangeKeysBySuffix and w.rangeKeySpan to avoid allocations.
    1195           1 : 
    1196           1 :         // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
    1197           1 :         // we may want to in the future.
    1198           1 :         w.rangeKeysBySuffix.Cmp = w.compare
    1199           1 :         w.rangeKeysBySuffix.Keys = span.Keys
    1200           1 :         sort.Sort(&w.rangeKeysBySuffix)
    1201           1 : 
    1202           1 :         w.rangeKeySpan = span
    1203           1 :         w.rangeKeySpan.Keys = w.rangeKeysBySuffix.Keys
    1204           1 :         w.err = firstError(w.err, w.rangeKeyEncoder.Encode(&w.rangeKeySpan))
    1205           1 : }
    1206             : 
    1207           1 : func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
    1208           1 :         if !w.disableKeyOrderChecks && w.rangeKeyBlock.EntryCount() > 0 {
    1209           1 :                 prevStartKey := w.rangeKeyBlock.CurKey()
    1210           1 :                 prevEndKey, _, err := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.CurValue())
    1211           1 :                 if err != nil {
    1212           0 :                         // We panic here as we should have previously decoded and validated this
    1213           0 :                         // key and value when it was first added to the range key block.
    1214           0 :                         panic(err)
    1215             :                 }
    1216             : 
    1217           1 :                 curStartKey := key
    1218           1 :                 curEndKey, _, err := rangekey.DecodeEndKey(curStartKey.Kind(), value)
    1219           1 :                 if err != nil {
    1220           0 :                         w.err = err
    1221           0 :                         return w.err
    1222           0 :                 }
    1223             : 
    1224             :                 // Start keys must be strictly increasing.
    1225           1 :                 if base.InternalCompare(w.compare, prevStartKey, curStartKey) >= 0 {
    1226           1 :                         w.err = errors.Errorf(
    1227           1 :                                 "pebble: range keys starts must be added in increasing order: %s, %s",
    1228           1 :                                 prevStartKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
    1229           1 :                         return w.err
    1230           1 :                 }
    1231             : 
    1232             :                 // Start keys are increasing. If the start user keys are equal, the
    1233             :                 // end keys must be equal (i.e. aligned spans).
    1234           1 :                 if w.compare(prevStartKey.UserKey, curStartKey.UserKey) == 0 {
    1235           1 :                         if w.compare(prevEndKey, curEndKey) != 0 {
    1236           1 :                                 w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
    1237           1 :                                         prevStartKey.Pretty(w.formatKey),
    1238           1 :                                         curStartKey.Pretty(w.formatKey))
    1239           1 :                                 return w.err
    1240           1 :                         }
    1241           1 :                 } else if w.compare(prevEndKey, curStartKey.UserKey) > 0 {
    1242           1 :                         // If the start user keys are NOT equal, the spans must be disjoint (i.e.
    1243           1 :                         // no overlap).
    1244           1 :                         // NOTE: the inequality excludes zero, as we allow the end key of the
    1245           1 :                         // lower span be the same as the start key of the upper span, because
    1246           1 :                         // the range end key is considered an exclusive bound.
    1247           1 :                         w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
    1248           1 :                                 prevStartKey.Pretty(w.formatKey),
    1249           1 :                                 curStartKey.Pretty(w.formatKey))
    1250           1 :                         return w.err
    1251           1 :                 }
    1252             :         }
    1253             : 
    1254             :         // TODO(travers): Add an invariant-gated check to ensure that suffix-values
    1255             :         // are sorted within coalesced spans.
    1256             : 
    1257             :         // Range-keys and point-keys are intended to live in "parallel" keyspaces.
    1258             :         // However, we track a single seqnum in the table metadata that spans both of
    1259             :         // these keyspaces.
    1260             :         // TODO(travers): Consider tracking range key seqnums separately.
    1261           1 :         w.meta.updateSeqNum(key.SeqNum())
    1262           1 : 
    1263           1 :         // Range tombstones are fragmented, so the start key of the first range key
    1264           1 :         // added will be the smallest. The largest range key is determined in
    1265           1 :         // Writer.Close() as the end key of the last range key added to the block.
    1266           1 :         if w.props.NumRangeKeys() == 0 {
    1267           1 :                 w.meta.SetSmallestRangeKey(key.Clone())
    1268           1 :         }
    1269             : 
    1270             :         // Update block properties.
    1271           1 :         w.props.RawRangeKeyKeySize += uint64(key.Size())
    1272           1 :         w.props.RawRangeKeyValueSize += uint64(len(value))
    1273           1 :         switch key.Kind() {
    1274           1 :         case base.InternalKeyKindRangeKeyDelete:
    1275           1 :                 w.props.NumRangeKeyDels++
    1276           1 :         case base.InternalKeyKindRangeKeySet:
    1277           1 :                 w.props.NumRangeKeySets++
    1278           1 :         case base.InternalKeyKindRangeKeyUnset:
    1279           1 :                 w.props.NumRangeKeyUnsets++
    1280           0 :         default:
    1281           0 :                 panic(errors.Errorf("pebble: invalid range key type: %s", key.Kind()))
    1282             :         }
    1283             : 
    1284           1 :         for i := range w.blockPropCollectors {
    1285           1 :                 if err := w.blockPropCollectors[i].Add(key, value); err != nil {
    1286           0 :                         return err
    1287           0 :                 }
    1288             :         }
    1289             : 
    1290             :         // Add the key to the block.
    1291           1 :         w.rangeKeyBlock.Add(key, value)
    1292           1 :         return nil
    1293             : }
    1294             : 
    1295             : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
    1296             : // slice. Any byte written to the returned slice is retained for the lifetime of
    1297             : // the Writer.
    1298           1 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
    1299           1 :         if cap(w.rkBuf)-len(w.rkBuf) < n {
    1300           1 :                 size := len(w.rkBuf) + 2*n
    1301           1 :                 if size < 2*cap(w.rkBuf) {
    1302           1 :                         size = 2 * cap(w.rkBuf)
    1303           1 :                 }
    1304           1 :                 buf := make([]byte, len(w.rkBuf), size)
    1305           1 :                 copy(buf, w.rkBuf)
    1306           1 :                 w.rkBuf = buf
    1307             :         }
    1308           1 :         b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
    1309           1 :         w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
    1310           1 :         return b
    1311             : }
    1312             : 
    1313             : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
    1314             : // range key buffer.
    1315           1 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
    1316           1 :         if len(k) == 0 {
    1317           1 :                 return nil
    1318           1 :         }
    1319           1 :         buf := w.tempRangeKeyBuf(len(k))
    1320           1 :         copy(buf, k)
    1321           1 :         return buf
    1322             : }
    1323             : 
    1324           1 : func (w *Writer) maybeAddToFilter(key []byte) {
    1325           1 :         if w.filter != nil {
    1326           1 :                 prefix := key[:w.split(key)]
    1327           1 :                 w.filter.addKey(prefix)
    1328           1 :         }
    1329             : }
    1330             : 
    1331           1 : func (w *Writer) flush(key InternalKey) error {
    1332           1 :         // We're finishing a data block.
    1333           1 :         err := w.finishDataBlockProps(w.dataBlockBuf)
    1334           1 :         if err != nil {
    1335           1 :                 return err
    1336           1 :         }
    1337           1 :         w.dataBlockBuf.finish()
    1338           1 :         w.dataBlockBuf.compressAndChecksum(w.compression)
    1339           1 :         // Since dataBlockEstimates.addInflightDataBlock was never called, the
    1340           1 :         // inflightSize is set to 0.
    1341           1 :         w.coordination.sizeEstimate.dataBlockCompressed(len(w.dataBlockBuf.compressed), 0)
    1342           1 : 
    1343           1 :         // Determine if the index block should be flushed. Since we're accessing the
    1344           1 :         // dataBlockBuf.dataBlock.curKey here, we have to make sure that once we start
    1345           1 :         // to pool the dataBlockBufs, the curKey isn't used by the Writer once the
    1346           1 :         // dataBlockBuf is added back to a sync.Pool. In this particular case, the
    1347           1 :         // byte slice which supports "sep" will eventually be copied when "sep" is
    1348           1 :         // added to the index block.
    1349           1 :         prevKey := w.dataBlockBuf.dataBlock.CurKey()
    1350           1 :         sep := w.indexEntrySep(prevKey, key, w.dataBlockBuf)
    1351           1 :         // We determine that we should flush an index block from the Writer client
    1352           1 :         // goroutine, but we actually finish the index block from the writeQueue.
    1353           1 :         // When we determine that an index block should be flushed, we need to call
    1354           1 :         // BlockPropertyCollector.FinishIndexBlock. But block property collector
    1355           1 :         // calls must happen sequentially from the Writer client. Therefore, we need
    1356           1 :         // to determine that we are going to flush the index block from the Writer
    1357           1 :         // client.
    1358           1 :         shouldFlushIndexBlock := supportsTwoLevelIndex(w.tableFormat) && w.indexBlock.shouldFlush(
    1359           1 :                 sep, encodedBHPEstimatedSize, w.indexBlockOptions, w.allocatorSizeClasses,
    1360           1 :         )
    1361           1 : 
    1362           1 :         var indexProps []byte
    1363           1 :         var flushableIndexBlock *indexBlockBuf
    1364           1 :         if shouldFlushIndexBlock {
    1365           1 :                 flushableIndexBlock = w.indexBlock
    1366           1 :                 w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
    1367           1 :                 // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
    1368           1 :                 // flush the index block.
    1369           1 :                 indexProps, err = w.finishIndexBlockProps()
    1370           1 :                 if err != nil {
    1371           1 :                         return err
    1372           1 :                 }
    1373             :         }
    1374             : 
    1375             :         // We've called BlockPropertyCollector.FinishDataBlock, and, if necessary,
    1376             :         // BlockPropertyCollector.FinishIndexBlock. Since we've decided to finish
    1377             :         // the data block, we can call
    1378             :         // BlockPropertyCollector.AddPrevDataBlockToIndexBlock.
    1379           1 :         w.addPrevDataBlockToIndexBlockProps()
    1380           1 : 
    1381           1 :         // Schedule a write.
    1382           1 :         writeTask := writeTaskPool.Get().(*writeTask)
    1383           1 :         // We're setting compressionDone to indicate that compression of this block
    1384           1 :         // has already been completed.
    1385           1 :         writeTask.compressionDone <- true
    1386           1 :         writeTask.buf = w.dataBlockBuf
    1387           1 :         writeTask.indexEntrySep = sep
    1388           1 :         writeTask.currIndexBlock = w.indexBlock
    1389           1 :         writeTask.indexInflightSize = sep.Size() + encodedBHPEstimatedSize
    1390           1 :         writeTask.finishedIndexProps = indexProps
    1391           1 :         writeTask.flushableIndexBlock = flushableIndexBlock
    1392           1 : 
    1393           1 :         // The writeTask corresponds to an unwritten index entry.
    1394           1 :         w.indexBlock.addInflight(writeTask.indexInflightSize)
    1395           1 : 
    1396           1 :         w.dataBlockBuf = nil
    1397           1 :         if w.coordination.parallelismEnabled {
    1398           1 :                 w.coordination.writeQueue.add(writeTask)
    1399           1 :         } else {
    1400           1 :                 err = w.coordination.writeQueue.addSync(writeTask)
    1401           1 :         }
    1402           1 :         w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
    1403           1 : 
    1404           1 :         return err
    1405             : }
    1406             : 
    1407           1 : func (w *Writer) maybeFlush(key InternalKey, valueLen int) error {
    1408           1 :         if !w.dataBlockBuf.shouldFlush(key, valueLen, w.dataBlockOptions, w.allocatorSizeClasses) {
    1409           1 :                 return nil
    1410           1 :         }
    1411             : 
    1412           1 :         err := w.flush(key)
    1413           1 : 
    1414           1 :         if err != nil {
    1415           1 :                 w.err = err
    1416           1 :                 return err
    1417           1 :         }
    1418             : 
    1419           1 :         return nil
    1420             : }
    1421             : 
    1422             : // dataBlockBuf.dataBlockProps set by this method must be encoded before any future use of the
    1423             : // dataBlockBuf.blockPropsEncoder, since the properties slice will get reused by the
    1424             : // blockPropsEncoder.
    1425           1 : func (w *Writer) finishDataBlockProps(buf *dataBlockBuf) error {
    1426           1 :         if len(w.blockPropCollectors) == 0 {
    1427           1 :                 return nil
    1428           1 :         }
    1429           1 :         var err error
    1430           1 :         buf.blockPropsEncoder.resetProps()
    1431           1 :         for i := range w.blockPropCollectors {
    1432           1 :                 scratch := buf.blockPropsEncoder.getScratchForProp()
    1433           1 :                 if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
    1434           1 :                         return err
    1435           1 :                 }
    1436           1 :                 buf.blockPropsEncoder.addProp(shortID(i), scratch)
    1437             :         }
    1438             : 
    1439           1 :         buf.dataBlockProps = buf.blockPropsEncoder.unsafeProps()
    1440           1 :         return nil
    1441             : }
    1442             : 
    1443             : // The BlockHandleWithProperties returned by this method must be encoded before any future use of
    1444             : // the Writer.blockPropsEncoder, since the properties slice will get reused by the blockPropsEncoder.
    1445             : // maybeAddBlockPropertiesToBlockHandle should only be called if block is being written synchronously
    1446             : // with the Writer client.
    1447             : func (w *Writer) maybeAddBlockPropertiesToBlockHandle(
    1448             :         bh block.Handle,
    1449           1 : ) (BlockHandleWithProperties, error) {
    1450           1 :         err := w.finishDataBlockProps(w.dataBlockBuf)
    1451           1 :         if err != nil {
    1452           0 :                 return BlockHandleWithProperties{}, err
    1453           0 :         }
    1454           1 :         return BlockHandleWithProperties{Handle: bh, Props: w.dataBlockBuf.dataBlockProps}, nil
    1455             : }
    1456             : 
    1457           1 : func (w *Writer) indexEntrySep(prevKey, key InternalKey, dataBlockBuf *dataBlockBuf) InternalKey {
    1458           1 :         // Make a rough guess that we want key-sized scratch to compute the separator.
    1459           1 :         if cap(dataBlockBuf.sepScratch) < key.Size() {
    1460           1 :                 dataBlockBuf.sepScratch = make([]byte, 0, key.Size()*2)
    1461           1 :         }
    1462             : 
    1463           1 :         var sep InternalKey
    1464           1 :         if key.UserKey == nil && key.Trailer == 0 {
    1465           1 :                 sep = prevKey.Successor(w.compare, w.successor, dataBlockBuf.sepScratch[:0])
    1466           1 :         } else {
    1467           1 :                 sep = prevKey.Separator(w.compare, w.separator, dataBlockBuf.sepScratch[:0], key)
    1468           1 :         }
    1469           1 :         return sep
    1470             : }
    1471             : 
    1472             : // addIndexEntry adds an index entry for the specified key and block handle.
    1473             : // addIndexEntry can be called from both the Writer client goroutine, and the
    1474             : // writeQueue goroutine. If the flushIndexBuf != nil, then the indexProps, as
    1475             : // they're used when the index block is finished.
    1476             : //
    1477             : // Invariant:
    1478             : //  1. addIndexEntry must not store references to the sep InternalKey, the tmp
    1479             : //     byte slice, bhp.Props. That is, these must be either deep copied or
    1480             : //     encoded.
    1481             : //  2. addIndexEntry must not hold references to the flushIndexBuf, and the writeTo
    1482             : //     indexBlockBufs.
    1483             : func (w *Writer) addIndexEntry(
    1484             :         sep InternalKey,
    1485             :         bhp BlockHandleWithProperties,
    1486             :         tmp []byte,
    1487             :         flushIndexBuf *indexBlockBuf,
    1488             :         writeTo *indexBlockBuf,
    1489             :         inflightSize int,
    1490             :         indexProps []byte,
    1491           1 : ) error {
    1492           1 :         if bhp.Length == 0 {
    1493           0 :                 // A valid blockHandle must be non-zero.
    1494           0 :                 // In particular, it must have a non-zero length.
    1495           0 :                 return nil
    1496           0 :         }
    1497             : 
    1498           1 :         encoded := encodeBlockHandleWithProperties(tmp, bhp)
    1499           1 : 
    1500           1 :         if flushIndexBuf != nil {
    1501           1 :                 if cap(w.indexPartitions) == 0 {
    1502           1 :                         w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
    1503           1 :                 }
    1504             :                 // Enable two level indexes if there is more than one index block.
    1505           1 :                 w.twoLevelIndex = true
    1506           1 :                 if err := w.finishIndexBlock(flushIndexBuf, indexProps); err != nil {
    1507           0 :                         return err
    1508           0 :                 }
    1509             :         }
    1510             : 
    1511           1 :         writeTo.add(sep, encoded, inflightSize)
    1512           1 :         return nil
    1513             : }
    1514             : 
    1515           1 : func (w *Writer) addPrevDataBlockToIndexBlockProps() {
    1516           1 :         for i := range w.blockPropCollectors {
    1517           1 :                 w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
    1518           1 :         }
    1519             : }
    1520             : 
    1521             : // addIndexEntrySync adds an index entry for the specified key and block handle.
    1522             : // Writer.addIndexEntry is only called synchronously once Writer.Close is called.
    1523             : // addIndexEntrySync should only be called if we're sure that index entries
    1524             : // aren't being written asynchronously.
    1525             : //
    1526             : // Invariant:
    1527             : //  1. addIndexEntrySync must not store references to the prevKey, key InternalKey's,
    1528             : //     the tmp byte slice. That is, these must be either deep copied or encoded.
    1529             : //
    1530             : // TODO: Improve coverage of this method. e.g. tests passed without the line
    1531             : // `w.twoLevelIndex = true` previously.
    1532             : func (w *Writer) addIndexEntrySync(
    1533             :         prevKey, key InternalKey, bhp BlockHandleWithProperties, tmp []byte,
    1534           1 : ) error {
    1535           1 :         return w.addIndexEntrySep(w.indexEntrySep(prevKey, key, w.dataBlockBuf), bhp, tmp)
    1536           1 : }
    1537             : 
    1538             : func (w *Writer) addIndexEntrySep(
    1539             :         sep InternalKey, bhp BlockHandleWithProperties, tmp []byte,
    1540           1 : ) error {
    1541           1 :         shouldFlush := supportsTwoLevelIndex(
    1542           1 :                 w.tableFormat) && w.indexBlock.shouldFlush(
    1543           1 :                 sep, encodedBHPEstimatedSize, w.indexBlockOptions, w.allocatorSizeClasses,
    1544           1 :         )
    1545           1 :         var flushableIndexBlock *indexBlockBuf
    1546           1 :         var props []byte
    1547           1 :         var err error
    1548           1 :         if shouldFlush {
    1549           1 :                 flushableIndexBlock = w.indexBlock
    1550           1 :                 w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
    1551           1 :                 w.twoLevelIndex = true
    1552           1 :                 // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
    1553           1 :                 // flush the index block.
    1554           1 :                 props, err = w.finishIndexBlockProps()
    1555           1 :                 if err != nil {
    1556           0 :                         return err
    1557           0 :                 }
    1558             :         }
    1559             : 
    1560           1 :         err = w.addIndexEntry(sep, bhp, tmp, flushableIndexBlock, w.indexBlock, 0, props)
    1561           1 :         if flushableIndexBlock != nil {
    1562           1 :                 flushableIndexBlock.clear()
    1563           1 :                 indexBlockBufPool.Put(flushableIndexBlock)
    1564           1 :         }
    1565           1 :         w.addPrevDataBlockToIndexBlockProps()
    1566           1 :         return err
    1567             : }
    1568             : 
    1569             : func shouldFlushWithHints(
    1570             :         keyLen, valueLen int,
    1571             :         restartInterval, estimatedBlockSize, numEntries int,
    1572             :         flushOptions flushDecisionOptions,
    1573             :         sizeClassHints []int,
    1574           1 : ) bool {
    1575           1 :         if numEntries == 0 {
    1576           1 :                 return false
    1577           1 :         }
    1578             : 
    1579             :         // If we are not informed about the memory allocator's size classes we fall
    1580             :         // back to a simple set of flush heuristics that are unaware of internal
    1581             :         // fragmentation in block cache allocations.
    1582           1 :         if len(sizeClassHints) == 0 {
    1583           1 :                 return shouldFlushWithoutHints(
    1584           1 :                         keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, flushOptions)
    1585           1 :         }
    1586             : 
    1587             :         // For size-class aware flushing we need to account for the metadata that is
    1588             :         // allocated when this block is loaded into the block cache. For instance, if
    1589             :         // a block has size 1020B it may fit within a 1024B class. However, when
    1590             :         // loaded into the block cache we also allocate space for the cache entry
    1591             :         // metadata. The new allocation of size ~1052B may now only fit within a
    1592             :         // 2048B class, which increases internal fragmentation.
    1593           1 :         blockSizeWithMetadata := estimatedBlockSize + cache.ValueMetadataSize
    1594           1 : 
    1595           1 :         // For the fast path we can avoid computing the exact varint encoded
    1596           1 :         // key-value pair size. Instead, we combine the key-value pair size with an
    1597           1 :         // upper-bound estimate of the associated metadata (4B restart point, 4B
    1598           1 :         // shared prefix length, 5B varint unshared key size, 5B varint value size).
    1599           1 :         newEstimatedSize := blockSizeWithMetadata + keyLen + valueLen + 18
    1600           1 :         // Our new block size estimate disregards key prefix compression. This puts
    1601           1 :         // us at risk of overestimating the size and flushing small blocks. We
    1602           1 :         // mitigate this by imposing a minimum size restriction.
    1603           1 :         if blockSizeWithMetadata <= flushOptions.sizeClassAwareThreshold || newEstimatedSize <= flushOptions.blockSize {
    1604           1 :                 return false
    1605           1 :         }
    1606             : 
    1607           1 :         sizeClass, ok := blockSizeClass(blockSizeWithMetadata, sizeClassHints)
    1608           1 :         // If the block size could not be mapped to a size class we fall back to
    1609           1 :         // using a simpler set of flush heuristics.
    1610           1 :         if !ok {
    1611           1 :                 return shouldFlushWithoutHints(
    1612           1 :                         keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, flushOptions)
    1613           1 :         }
    1614             : 
    1615             :         // Tighter upper-bound estimate of the metadata stored with the next
    1616             :         // key-value pair.
    1617           1 :         newSize := blockSizeWithMetadata + keyLen + valueLen
    1618           1 :         if numEntries%restartInterval == 0 {
    1619           0 :                 newSize += 4
    1620           0 :         }
    1621           1 :         newSize += 4                            // varint for shared prefix length
    1622           1 :         newSize += uvarintLen(uint32(keyLen))   // varint for unshared key bytes
    1623           1 :         newSize += uvarintLen(uint32(valueLen)) // varint for value size
    1624           1 : 
    1625           1 :         if blockSizeWithMetadata < flushOptions.blockSize {
    1626           1 :                 newSizeClass, ok := blockSizeClass(newSize, sizeClassHints)
    1627           1 :                 if ok && newSizeClass-newSize >= sizeClass-blockSizeWithMetadata {
    1628           1 :                         // Although the block hasn't reached the target size, waiting to insert the
    1629           1 :                         // next entry would exceed the target and increase memory fragmentation.
    1630           1 :                         return true
    1631           1 :                 }
    1632           1 :                 return false
    1633             :         }
    1634             : 
    1635             :         // Flush if inserting the next entry bumps the block size to the memory
    1636             :         // allocator's next size class.
    1637           1 :         return newSize > sizeClass
    1638             : }
    1639             : 
    1640             : func shouldFlushWithoutHints(
    1641             :         keyLen, valueLen int,
    1642             :         restartInterval, estimatedBlockSize, numEntries int,
    1643             :         flushOptions flushDecisionOptions,
    1644           1 : ) bool {
    1645           1 :         if estimatedBlockSize >= flushOptions.blockSize {
    1646           1 :                 return true
    1647           1 :         }
    1648             : 
    1649             :         // The block is currently smaller than the target size.
    1650           1 :         if estimatedBlockSize <= flushOptions.blockSizeThreshold {
    1651           1 :                 // The block is smaller than the threshold size at which we'll consider
    1652           1 :                 // flushing it.
    1653           1 :                 return false
    1654           1 :         }
    1655             : 
    1656           1 :         newSize := estimatedBlockSize + keyLen + valueLen
    1657           1 :         if numEntries%restartInterval == 0 {
    1658           1 :                 newSize += 4
    1659           1 :         }
    1660           1 :         newSize += 4                            // varint for shared prefix length
    1661           1 :         newSize += uvarintLen(uint32(keyLen))   // varint for unshared key bytes
    1662           1 :         newSize += uvarintLen(uint32(valueLen)) // varint for value size
    1663           1 :         // Flush if the block plus the new entry is larger than the target size.
    1664           1 :         return newSize > flushOptions.blockSize
    1665             : }
    1666             : 
    1667             : // blockSizeClass returns the smallest memory allocator size class that could
    1668             : // hold a block of a given size and returns a boolean indicating whether an
    1669             : // appropriate size class was found. It is useful for computing the potential
    1670             : // space wasted by an allocation.
    1671           1 : func blockSizeClass(blockSize int, sizeClassHints []int) (int, bool) {
    1672           1 :         sizeClassIdx, _ := slices.BinarySearch(sizeClassHints, blockSize)
    1673           1 :         if sizeClassIdx == len(sizeClassHints) {
    1674           1 :                 return -1, false
    1675           1 :         }
    1676           1 :         return sizeClassHints[sizeClassIdx], true
    1677             : }
    1678             : 
    1679           1 : func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) {
    1680           1 :         if len(k.UserKey) == 0 {
    1681           0 :                 return a, k
    1682           0 :         }
    1683           1 :         a, keyCopy := a.Copy(k.UserKey)
    1684           1 :         return a, InternalKey{UserKey: keyCopy, Trailer: k.Trailer}
    1685             : }
    1686             : 
    1687             : // Invariants: The byte slice returned by finishIndexBlockProps is heap-allocated
    1688             : //
    1689             : //      and has its own lifetime, independent of the Writer and the blockPropsEncoder,
    1690             : //
    1691             : // and it is safe to:
    1692             : //  1. Reuse w.blockPropsEncoder without first encoding the byte slice returned.
    1693             : //  2. Store the byte slice in the Writer since it is a copy and not supported by
    1694             : //     an underlying buffer.
    1695           1 : func (w *Writer) finishIndexBlockProps() ([]byte, error) {
    1696           1 :         w.blockPropsEncoder.resetProps()
    1697           1 :         for i := range w.blockPropCollectors {
    1698           1 :                 scratch := w.blockPropsEncoder.getScratchForProp()
    1699           1 :                 var err error
    1700           1 :                 if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
    1701           1 :                         return nil, err
    1702           1 :                 }
    1703           1 :                 w.blockPropsEncoder.addProp(shortID(i), scratch)
    1704             :         }
    1705           1 :         return w.blockPropsEncoder.props(), nil
    1706             : }
    1707             : 
    1708             : // finishIndexBlock finishes the current index block and adds it to the top
    1709             : // level index block. This is only used when two level indexes are enabled.
    1710             : //
    1711             : // Invariants:
    1712             : //  1. The props slice passed into finishedIndexBlock must not be a
    1713             : //     owned by any other struct, since it will be stored in the Writer.indexPartitions
    1714             : //     slice.
    1715             : //  2. None of the buffers owned by indexBuf will be shallow copied and stored elsewhere.
    1716             : //     That is, it must be safe to reuse indexBuf after finishIndexBlock has been called.
    1717           1 : func (w *Writer) finishIndexBlock(indexBuf *indexBlockBuf, props []byte) error {
    1718           1 :         part := indexBlockAndBlockProperties{
    1719           1 :                 nEntries: indexBuf.block.EntryCount(), properties: props,
    1720           1 :         }
    1721           1 :         w.indexSepAlloc, part.sep = cloneKeyWithBuf(
    1722           1 :                 indexBuf.block.CurKey(), w.indexSepAlloc,
    1723           1 :         )
    1724           1 :         bk := indexBuf.finish()
    1725           1 :         if len(w.indexBlockAlloc) < len(bk) {
    1726           1 :                 // Allocate enough bytes for approximately 16 index blocks.
    1727           1 :                 w.indexBlockAlloc = make([]byte, len(bk)*16)
    1728           1 :         }
    1729           1 :         n := copy(w.indexBlockAlloc, bk)
    1730           1 :         part.block = w.indexBlockAlloc[:n:n]
    1731           1 :         w.indexBlockAlloc = w.indexBlockAlloc[n:]
    1732           1 :         w.indexPartitions = append(w.indexPartitions, part)
    1733           1 :         return nil
    1734             : }
    1735             : 
    1736           1 : func (w *Writer) writeTwoLevelIndex() (block.Handle, error) {
    1737           1 :         props, err := w.finishIndexBlockProps()
    1738           1 :         if err != nil {
    1739           0 :                 return block.Handle{}, err
    1740           0 :         }
    1741             :         // Add the final unfinished index.
    1742           1 :         if err = w.finishIndexBlock(w.indexBlock, props); err != nil {
    1743           0 :                 return block.Handle{}, err
    1744           0 :         }
    1745             : 
    1746           1 :         for i := range w.indexPartitions {
    1747           1 :                 b := &w.indexPartitions[i]
    1748           1 :                 w.props.NumDataBlocks += uint64(b.nEntries)
    1749           1 : 
    1750           1 :                 data := b.block
    1751           1 :                 w.props.IndexSize += uint64(len(data))
    1752           1 :                 bh, err := w.layout.WriteIndexBlock(data)
    1753           1 :                 if err != nil {
    1754           0 :                         return block.Handle{}, err
    1755           0 :                 }
    1756           1 :                 bhp := BlockHandleWithProperties{
    1757           1 :                         Handle: bh,
    1758           1 :                         Props:  b.properties,
    1759           1 :                 }
    1760           1 :                 encoded := encodeBlockHandleWithProperties(w.blockBuf.tmp[:], bhp)
    1761           1 :                 w.topLevelIndexBlock.Add(b.sep, encoded)
    1762             :         }
    1763             : 
    1764             :         // NB: RocksDB includes the block trailer length in the index size
    1765             :         // property, though it doesn't include the trailer in the top level
    1766             :         // index size property.
    1767           1 :         w.props.IndexPartitions = uint64(len(w.indexPartitions))
    1768           1 :         w.props.TopLevelIndexSize = uint64(w.topLevelIndexBlock.EstimatedSize())
    1769           1 :         w.props.IndexSize += w.props.TopLevelIndexSize + block.TrailerLen
    1770           1 :         return w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish())
    1771             : }
    1772             : 
    1773             : func compressAndChecksum(
    1774             :         b []byte, compression Compression, blockBuf *blockBuf,
    1775           1 : ) (compressed []byte, trailer block.Trailer) {
    1776           1 :         // Compress the buffer, discarding the result if the improvement isn't at
    1777           1 :         // least 12.5%.
    1778           1 :         blockType, compressed := compressBlock(compression, b, blockBuf.compressedBuf)
    1779           1 :         if blockType != noCompressionBlockType && cap(compressed) > cap(blockBuf.compressedBuf) {
    1780           1 :                 blockBuf.compressedBuf = compressed[:cap(compressed)]
    1781           1 :         }
    1782           1 :         if len(compressed) < len(b)-len(b)/8 {
    1783           1 :                 b = compressed
    1784           1 :         } else {
    1785           1 :                 blockType = noCompressionBlockType
    1786           1 :         }
    1787             : 
    1788             :         // Calculate the checksum.
    1789           1 :         trailer[0] = byte(blockType)
    1790           1 :         checksum := blockBuf.checksummer.Checksum(b, trailer[:1])
    1791           1 :         return b, block.MakeTrailer(byte(blockType), checksum)
    1792             : }
    1793             : 
    1794             : // assertFormatCompatibility ensures that the features present on the table are
    1795             : // compatible with the table format version.
    1796           1 : func (w *Writer) assertFormatCompatibility() error {
    1797           1 :         // PebbleDBv1: block properties.
    1798           1 :         if len(w.blockPropCollectors) > 0 && w.tableFormat < TableFormatPebblev1 {
    1799           1 :                 return errors.Newf(
    1800           1 :                         "table format version %s is less than the minimum required version %s for block properties",
    1801           1 :                         w.tableFormat, TableFormatPebblev1,
    1802           1 :                 )
    1803           1 :         }
    1804             : 
    1805             :         // PebbleDBv2: range keys.
    1806           1 :         if w.props.NumRangeKeys() > 0 && w.tableFormat < TableFormatPebblev2 {
    1807           1 :                 return errors.Newf(
    1808           1 :                         "table format version %s is less than the minimum required version %s for range keys",
    1809           1 :                         w.tableFormat, TableFormatPebblev2,
    1810           1 :                 )
    1811           1 :         }
    1812             : 
    1813             :         // PebbleDBv3: value blocks.
    1814           1 :         if (w.props.NumValueBlocks > 0 || w.props.NumValuesInValueBlocks > 0 ||
    1815           1 :                 w.props.ValueBlocksSize > 0) && w.tableFormat < TableFormatPebblev3 {
    1816           0 :                 return errors.Newf(
    1817           0 :                         "table format version %s is less than the minimum required version %s for value blocks",
    1818           0 :                         w.tableFormat, TableFormatPebblev3)
    1819           0 :         }
    1820             : 
    1821             :         // PebbleDBv4: DELSIZED tombstones.
    1822           1 :         if w.props.NumSizedDeletions > 0 && w.tableFormat < TableFormatPebblev4 {
    1823           0 :                 return errors.Newf(
    1824           0 :                         "table format version %s is less than the minimum required version %s for sized deletion tombstones",
    1825           0 :                         w.tableFormat, TableFormatPebblev4)
    1826           0 :         }
    1827           1 :         return nil
    1828             : }
    1829             : 
    1830             : // UnsafeLastPointUserKey returns the last point key written to the writer to
    1831             : // which this option was passed during creation. The returned key points
    1832             : // directly into a buffer belonging to the Writer. The value's lifetime ends the
    1833             : // next time a point key is added to the Writer.
    1834             : //
    1835             : // Must not be called after Writer is closed.
    1836           1 : func (w *Writer) UnsafeLastPointUserKey() []byte {
    1837           1 :         if w != nil && w.dataBlockBuf.dataBlock.EntryCount() >= 1 {
    1838           1 :                 // w.dataBlockBuf.dataBlock.curKey is guaranteed to point to the last point key
    1839           1 :                 // which was added to the Writer.
    1840           1 :                 return w.dataBlockBuf.dataBlock.CurUserKey()
    1841           1 :         }
    1842           1 :         return nil
    1843             : }
    1844             : 
    1845             : // EncodeSpan encodes the keys in the given span. The span can contain either
    1846             : // only RANGEDEL keys or only range keys.
    1847           1 : func (w *Writer) EncodeSpan(span *keyspan.Span) error {
    1848           1 :         if span.Empty() {
    1849           1 :                 return nil
    1850           1 :         }
    1851           1 :         if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
    1852           1 :                 return rangedel.Encode(span, w.Add)
    1853           1 :         }
    1854           1 :         return rangekey.Encode(span, w.AddRangeKey)
    1855             : }
    1856             : 
    1857             : // Close finishes writing the table and closes the underlying file that the
    1858             : // table was written to.
    1859           1 : func (w *Writer) Close() (err error) {
    1860           1 :         defer func() {
    1861           1 :                 if w.valueBlockWriter != nil {
    1862           1 :                         releaseValueBlockWriter(w.valueBlockWriter)
    1863           1 :                         // Defensive code in case Close gets called again. We don't want to put
    1864           1 :                         // the same object to a sync.Pool.
    1865           1 :                         w.valueBlockWriter = nil
    1866           1 :                 }
    1867           1 :                 w.layout.Abort()
    1868           1 :                 // Record any error in the writer (so we can exit early if Close is called
    1869           1 :                 // again).
    1870           1 :                 if err != nil {
    1871           1 :                         w.err = err
    1872           1 :                 }
    1873             :         }()
    1874             : 
    1875             :         // finish must be called before we check for an error, because finish will
    1876             :         // block until every single task added to the writeQueue has been processed,
    1877             :         // and an error could be encountered while any of those tasks are processed.
    1878           1 :         if err := w.coordination.writeQueue.finish(); err != nil {
    1879           1 :                 return err
    1880           1 :         }
    1881             : 
    1882           1 :         if w.err != nil {
    1883           1 :                 return w.err
    1884           1 :         }
    1885             : 
    1886             :         // The w.meta.LargestPointKey is only used once the Writer is closed, so it is safe to set it
    1887             :         // when the Writer is closed.
    1888             :         //
    1889             :         // The following invariants ensure that setting the largest key at this point of a Writer close
    1890             :         // is correct:
    1891             :         // 1. Keys must only be added to the Writer in an increasing order.
    1892             :         // 2. The current w.dataBlockBuf is guaranteed to have the latest key added to the Writer. This
    1893             :         //    must be true, because a w.dataBlockBuf is only switched out when a dataBlock is flushed,
    1894             :         //    however, if a dataBlock is flushed, then we add a key to the new w.dataBlockBuf in the
    1895             :         //    addPoint function after the flush occurs.
    1896           1 :         if w.dataBlockBuf.dataBlock.EntryCount() >= 1 {
    1897           1 :                 w.meta.SetLargestPointKey(w.dataBlockBuf.dataBlock.CurKey().Clone())
    1898           1 :         }
    1899             : 
    1900             :         // Finish the last data block, or force an empty data block if there
    1901             :         // aren't any data blocks at all.
    1902           1 :         if w.dataBlockBuf.dataBlock.EntryCount() > 0 || w.indexBlock.block.EntryCount() == 0 {
    1903           1 :                 bh, err := w.layout.WriteDataBlock(w.dataBlockBuf.dataBlock.Finish(), &w.dataBlockBuf.blockBuf)
    1904           1 :                 if err != nil {
    1905           0 :                         return err
    1906           0 :                 }
    1907           1 :                 bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
    1908           1 :                 if err != nil {
    1909           0 :                         return err
    1910           0 :                 }
    1911           1 :                 prevKey := w.dataBlockBuf.dataBlock.CurKey()
    1912           1 :                 if err := w.addIndexEntrySync(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil {
    1913           0 :                         return err
    1914           0 :                 }
    1915             :         }
    1916           1 :         w.props.DataSize = w.layout.offset
    1917           1 : 
    1918           1 :         // Write the filter block.
    1919           1 :         if w.filter != nil {
    1920           1 :                 bh, err := w.layout.WriteFilterBlock(w.filter)
    1921           1 :                 if err != nil {
    1922           0 :                         return err
    1923           0 :                 }
    1924           1 :                 w.props.FilterPolicyName = w.filter.policyName()
    1925           1 :                 w.props.FilterSize = bh.Length
    1926             :         }
    1927             : 
    1928           1 :         if w.twoLevelIndex {
    1929           1 :                 w.props.IndexType = twoLevelIndex
    1930           1 :                 // Write the two level index block.
    1931           1 :                 if _, err = w.writeTwoLevelIndex(); err != nil {
    1932           0 :                         return err
    1933           0 :                 }
    1934           1 :         } else {
    1935           1 :                 w.props.IndexType = binarySearchIndex
    1936           1 :                 // NB: RocksDB includes the block trailer length in the index size
    1937           1 :                 // property, though it doesn't include the trailer in the filter size
    1938           1 :                 // property.
    1939           1 :                 w.props.IndexSize = uint64(w.indexBlock.estimatedSize()) + block.TrailerLen
    1940           1 :                 w.props.NumDataBlocks = uint64(w.indexBlock.block.EntryCount())
    1941           1 :                 // Write the single level index block.
    1942           1 :                 if _, err = w.layout.WriteIndexBlock(w.indexBlock.finish()); err != nil {
    1943           0 :                         return err
    1944           0 :                 }
    1945             :         }
    1946             : 
    1947             :         // Write the range-del block.
    1948           1 :         if w.props.NumRangeDeletions > 0 {
    1949           1 :                 // Because the range tombstones are fragmented, the end key of the last
    1950           1 :                 // added range tombstone will be the largest range tombstone key. Note
    1951           1 :                 // that we need to make this into a range deletion sentinel because
    1952           1 :                 // sstable boundaries are inclusive while the end key of a range
    1953           1 :                 // deletion tombstone is exclusive. A Clone() is necessary as
    1954           1 :                 // rangeDelBlock.curValue is the same slice that will get passed into
    1955           1 :                 // w.writer, and some implementations of vfs.File mutate the slice
    1956           1 :                 // passed into Write(). Also, w.meta will often outlive the blockWriter,
    1957           1 :                 // and so cloning curValue allows the rangeDelBlock's internal buffer to
    1958           1 :                 // get gc'd.
    1959           1 :                 k := base.MakeRangeDeleteSentinelKey(w.rangeDelBlock.CurValue()).Clone()
    1960           1 :                 w.meta.SetLargestRangeDelKey(k)
    1961           1 :                 if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
    1962           0 :                         return err
    1963           0 :                 }
    1964             :         }
    1965             : 
    1966             :         // Write the range-key block, flushing any remaining spans from the
    1967             :         // fragmenter first.
    1968           1 :         w.fragmenter.Finish()
    1969           1 : 
    1970           1 :         if w.props.NumRangeKeys() > 0 {
    1971           1 :                 key := w.rangeKeyBlock.CurKey()
    1972           1 :                 kind := key.Kind()
    1973           1 :                 endKey, _, err := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.CurValue())
    1974           1 :                 if err != nil {
    1975           0 :                         return err
    1976           0 :                 }
    1977           1 :                 k := base.MakeExclusiveSentinelKey(kind, endKey).Clone()
    1978           1 :                 w.meta.SetLargestRangeKey(k)
    1979           1 :                 if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
    1980           0 :                         return err
    1981           0 :                 }
    1982             :         }
    1983             : 
    1984           1 :         if w.valueBlockWriter != nil {
    1985           1 :                 _, vbStats, err := w.valueBlockWriter.finish(&w.layout, w.layout.offset)
    1986           1 :                 if err != nil {
    1987           0 :                         return err
    1988           0 :                 }
    1989           1 :                 w.props.NumValueBlocks = vbStats.numValueBlocks
    1990           1 :                 w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
    1991           1 :                 w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
    1992             :         }
    1993             : 
    1994           1 :         {
    1995           1 :                 // Finish and record the prop collectors if props are not yet recorded.
    1996           1 :                 // Pre-computed props might have been copied by specialized sst creators
    1997           1 :                 // like suffix replacer.
    1998           1 :                 if len(w.props.UserProperties) == 0 {
    1999           1 :                         userProps := make(map[string]string)
    2000           1 :                         for i := range w.blockPropCollectors {
    2001           1 :                                 scratch := w.blockPropsEncoder.getScratchForProp()
    2002           1 :                                 // Place the shortID in the first byte.
    2003           1 :                                 scratch = append(scratch, byte(i))
    2004           1 :                                 buf, err := w.blockPropCollectors[i].FinishTable(scratch)
    2005           1 :                                 if err != nil {
    2006           1 :                                         return err
    2007           1 :                                 }
    2008           1 :                                 var prop string
    2009           1 :                                 if len(buf) > 0 {
    2010           1 :                                         prop = string(buf)
    2011           1 :                                 }
    2012             :                                 // NB: The property is populated in the map even if it is the
    2013             :                                 // empty string, since the presence in the map is what indicates
    2014             :                                 // that the block property collector was used when writing.
    2015           1 :                                 userProps[w.blockPropCollectors[i].Name()] = prop
    2016             :                         }
    2017           1 :                         if len(userProps) > 0 {
    2018           1 :                                 w.props.UserProperties = userProps
    2019           1 :                         }
    2020             :                 }
    2021             : 
    2022             :                 // Write the properties block.
    2023           1 :                 var raw rowblk.Writer
    2024           1 :                 // The restart interval is set to infinity because the properties block
    2025           1 :                 // is always read sequentially and cached in a heap located object. This
    2026           1 :                 // reduces table size without a significant impact on performance.
    2027           1 :                 raw.RestartInterval = propertiesBlockRestartInterval
    2028           1 :                 w.props.CompressionOptions = rocksDBCompressionOptions
    2029           1 :                 w.props.save(w.tableFormat, &raw)
    2030           1 :                 w.layout.WritePropertiesBlock(raw.Finish())
    2031             :         }
    2032             : 
    2033             :         // Write the table footer.
    2034           1 :         w.meta.Size, err = w.layout.Finish()
    2035           1 :         if err != nil {
    2036           1 :                 return err
    2037           1 :         }
    2038           1 :         w.meta.Properties = w.props
    2039           1 : 
    2040           1 :         // Check that the features present in the table are compatible with the format
    2041           1 :         // configured for the table.
    2042           1 :         if err = w.assertFormatCompatibility(); err != nil {
    2043           1 :                 return err
    2044           1 :         }
    2045             : 
    2046           1 :         w.dataBlockBuf.clear()
    2047           1 :         dataBlockBufPool.Put(w.dataBlockBuf)
    2048           1 :         w.dataBlockBuf = nil
    2049           1 :         w.indexBlock.clear()
    2050           1 :         indexBlockBufPool.Put(w.indexBlock)
    2051           1 :         w.indexBlock = nil
    2052           1 : 
    2053           1 :         // Make any future calls to Set or Close return an error.
    2054           1 :         w.err = errWriterClosed
    2055           1 :         return nil
    2056             : }
    2057             : 
    2058             : // EstimatedSize returns the estimated size of the sstable being written if a
    2059             : // call to Finish() was made without adding additional keys.
    2060           1 : func (w *Writer) EstimatedSize() uint64 {
    2061           1 :         if w == nil {
    2062           0 :                 return 0
    2063           0 :         }
    2064           1 :         return w.coordination.sizeEstimate.size() +
    2065           1 :                 uint64(w.dataBlockBuf.dataBlock.EstimatedSize()) +
    2066           1 :                 w.indexBlock.estimatedSize()
    2067             : }
    2068             : 
    2069             : // Metadata returns the metadata for the finished sstable. Only valid to call
    2070             : // after the sstable has been finished.
    2071           1 : func (w *Writer) Metadata() (*WriterMetadata, error) {
    2072           1 :         if !w.layout.IsFinished() {
    2073           0 :                 return nil, errors.New("pebble: writer is not closed")
    2074           0 :         }
    2075           1 :         return &w.meta, nil
    2076             : }
    2077             : 
    2078             : // WriterOption provide an interface to do work on Writer while it is being
    2079             : // opened.
    2080             : type WriterOption interface {
    2081             :         // writerApply is called on the writer during opening in order to set
    2082             :         // internal parameters.
    2083             :         writerApply(*Writer)
    2084             : }
    2085             : 
    2086             : // NewWriter returns a new table writer for the file. Closing the writer will
    2087             : // close the file.
    2088           1 : func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...WriterOption) *Writer {
    2089           1 :         o = o.ensureDefaults()
    2090           1 :         w := &Writer{
    2091           1 :                 layout: makeLayoutWriter(writable, o),
    2092           1 :                 meta: WriterMetadata{
    2093           1 :                         SmallestSeqNum: math.MaxUint64,
    2094           1 :                 },
    2095           1 :                 dataBlockOptions: flushDecisionOptions{
    2096           1 :                         blockSize:               o.BlockSize,
    2097           1 :                         blockSizeThreshold:      (o.BlockSize*o.BlockSizeThreshold + 99) / 100,
    2098           1 :                         sizeClassAwareThreshold: (o.BlockSize*o.SizeClassAwareThreshold + 99) / 100,
    2099           1 :                 },
    2100           1 :                 indexBlockOptions: flushDecisionOptions{
    2101           1 :                         blockSize:               o.IndexBlockSize,
    2102           1 :                         blockSizeThreshold:      (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100,
    2103           1 :                         sizeClassAwareThreshold: (o.IndexBlockSize*o.SizeClassAwareThreshold + 99) / 100,
    2104           1 :                 },
    2105           1 :                 compare:               o.Comparer.Compare,
    2106           1 :                 split:                 o.Comparer.Split,
    2107           1 :                 formatKey:             o.Comparer.FormatKey,
    2108           1 :                 compression:           o.Compression,
    2109           1 :                 separator:             o.Comparer.Separator,
    2110           1 :                 successor:             o.Comparer.Successor,
    2111           1 :                 tableFormat:           o.TableFormat,
    2112           1 :                 isStrictObsolete:      o.IsStrictObsolete,
    2113           1 :                 writingToLowestLevel:  o.WritingToLowestLevel,
    2114           1 :                 restartInterval:       o.BlockRestartInterval,
    2115           1 :                 checksumType:          o.Checksum,
    2116           1 :                 disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
    2117           1 :                 indexBlock:            newIndexBlockBuf(o.Parallelism),
    2118           1 :                 rangeDelBlock:         rowblk.Writer{RestartInterval: 1},
    2119           1 :                 rangeKeyBlock:         rowblk.Writer{RestartInterval: 1},
    2120           1 :                 topLevelIndexBlock:    rowblk.Writer{RestartInterval: 1},
    2121           1 :                 fragmenter: keyspan.Fragmenter{
    2122           1 :                         Cmp:    o.Comparer.Compare,
    2123           1 :                         Format: o.Comparer.FormatKey,
    2124           1 :                 },
    2125           1 :                 allocatorSizeClasses: o.AllocatorSizeClasses,
    2126           1 :         }
    2127           1 :         if w.tableFormat >= TableFormatPebblev3 {
    2128           1 :                 w.shortAttributeExtractor = o.ShortAttributeExtractor
    2129           1 :                 w.requiredInPlaceValueBound = o.RequiredInPlaceValueBound
    2130           1 :                 if !o.DisableValueBlocks {
    2131           1 :                         w.valueBlockWriter = newValueBlockWriter(
    2132           1 :                                 w.dataBlockOptions.blockSize, w.dataBlockOptions.blockSizeThreshold, w.compression, w.checksumType, func(compressedSize int) {
    2133           1 :                                         w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0)
    2134           1 :                                 })
    2135             :                 }
    2136             :         }
    2137             : 
    2138           1 :         w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
    2139           1 : 
    2140           1 :         w.blockBuf = blockBuf{
    2141           1 :                 checksummer: block.Checksummer{Type: o.Checksum},
    2142           1 :         }
    2143           1 : 
    2144           1 :         w.coordination.init(o.Parallelism, w)
    2145           1 : 
    2146           1 :         if writable == nil {
    2147           0 :                 w.err = errors.New("pebble: nil writable")
    2148           0 :                 return w
    2149           0 :         }
    2150             : 
    2151             :         // Note that WriterOptions are applied in two places; the ones with a
    2152             :         // preApply() method are applied here. The rest are applied down below after
    2153             :         // default properties are set.
    2154           1 :         type preApply interface{ preApply() }
    2155           1 :         for _, opt := range extraOpts {
    2156           0 :                 if _, ok := opt.(preApply); ok {
    2157           0 :                         opt.writerApply(w)
    2158           0 :                 }
    2159             :         }
    2160             : 
    2161           1 :         if o.FilterPolicy != nil {
    2162           1 :                 switch o.FilterType {
    2163           1 :                 case TableFilter:
    2164           1 :                         w.filter = newTableFilterWriter(o.FilterPolicy)
    2165           0 :                 default:
    2166           0 :                         panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
    2167             :                 }
    2168             :         }
    2169             : 
    2170           1 :         w.props.ComparerName = o.Comparer.Name
    2171           1 :         w.props.CompressionName = o.Compression.String()
    2172           1 :         w.props.MergerName = o.MergerName
    2173           1 :         w.props.PropertyCollectorNames = "[]"
    2174           1 : 
    2175           1 :         numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
    2176           1 :         if w.tableFormat >= TableFormatPebblev4 {
    2177           1 :                 numBlockPropertyCollectors++
    2178           1 :         }
    2179             : 
    2180           1 :         if numBlockPropertyCollectors > 0 {
    2181           1 :                 if numBlockPropertyCollectors > maxPropertyCollectors {
    2182           0 :                         w.err = errors.New("pebble: too many block property collectors")
    2183           0 :                         return w
    2184           0 :                 }
    2185           1 :                 w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
    2186           1 :                 for _, constructFn := range o.BlockPropertyCollectors {
    2187           1 :                         w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
    2188           1 :                 }
    2189           1 :                 if w.tableFormat >= TableFormatPebblev4 {
    2190           1 :                         w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
    2191           1 :                 }
    2192             : 
    2193           1 :                 var buf bytes.Buffer
    2194           1 :                 buf.WriteString("[")
    2195           1 :                 for i := range w.blockPropCollectors {
    2196           1 :                         if i > 0 {
    2197           1 :                                 buf.WriteString(",")
    2198           1 :                         }
    2199           1 :                         buf.WriteString(w.blockPropCollectors[i].Name())
    2200             :                 }
    2201           1 :                 buf.WriteString("]")
    2202           1 :                 w.props.PropertyCollectorNames = buf.String()
    2203             :         }
    2204             : 
    2205             :         // Apply the remaining WriterOptions that do not have a preApply() method.
    2206           1 :         for _, opt := range extraOpts {
    2207           0 :                 if _, ok := opt.(preApply); ok {
    2208           0 :                         continue
    2209             :                 }
    2210           0 :                 opt.writerApply(w)
    2211             :         }
    2212             : 
    2213             :         // Initialize the range key fragmenter and encoder.
    2214           1 :         w.fragmenter.Emit = w.encodeRangeKeySpan
    2215           1 :         w.rangeKeyEncoder.Emit = w.addRangeKey
    2216           1 :         return w
    2217             : }
    2218             : 
    2219             : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
    2220             : // be used internally by Pebble.
    2221             : func (w *Writer) SetSnapshotPinnedProperties(
    2222             :         pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
    2223           1 : ) {
    2224           1 :         w.props.SnapshotPinnedKeys = pinnedKeyCount
    2225           1 :         w.props.SnapshotPinnedKeySize = pinnedKeySize
    2226           1 :         w.props.SnapshotPinnedValueSize = pinnedValueSize
    2227           1 : }

Generated by: LCOV version 1.14