LCOV - code coverage report
Current view: top level - pebble/sstable - writer.go (source / functions) Hit Total Coverage
Test: 2023-12-12 08:16Z 9848bcdb - tests + meta.lcov Lines: 1339 1491 89.8 %
Date: 2023-12-12 08:17:04 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "encoding/binary"
      10             :         "fmt"
      11             :         "math"
      12             :         "runtime"
      13             :         "sort"
      14             :         "sync"
      15             : 
      16             :         "github.com/cespare/xxhash/v2"
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      20             :         "github.com/cockroachdb/pebble/internal/cache"
      21             :         "github.com/cockroachdb/pebble/internal/crc"
      22             :         "github.com/cockroachdb/pebble/internal/invariants"
      23             :         "github.com/cockroachdb/pebble/internal/keyspan"
      24             :         "github.com/cockroachdb/pebble/internal/private"
      25             :         "github.com/cockroachdb/pebble/internal/rangekey"
      26             :         "github.com/cockroachdb/pebble/objstorage"
      27             : )
      28             : 
      29             : // encodedBHPEstimatedSize estimates the size of the encoded BlockHandleWithProperties.
      30             : // It would also be nice to account for the length of the data block properties here,
      31             : // but isn't necessary since this is an estimate.
      32             : const encodedBHPEstimatedSize = binary.MaxVarintLen64 * 2
      33             : 
      34             : var errWriterClosed = errors.New("pebble: writer is closed")
      35             : 
      36             : // WriterMetadata holds info about a finished sstable.
      37             : type WriterMetadata struct {
      38             :         Size          uint64
      39             :         SmallestPoint InternalKey
      40             :         // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
      41             :         // before Writer.Close is called, because they may only be set on
      42             :         // Writer.Close.
      43             :         LargestPoint     InternalKey
      44             :         SmallestRangeDel InternalKey
      45             :         LargestRangeDel  InternalKey
      46             :         SmallestRangeKey InternalKey
      47             :         LargestRangeKey  InternalKey
      48             :         HasPointKeys     bool
      49             :         HasRangeDelKeys  bool
      50             :         HasRangeKeys     bool
      51             :         SmallestSeqNum   uint64
      52             :         LargestSeqNum    uint64
      53             :         Properties       Properties
      54             : }
      55             : 
      56             : // SetSmallestPointKey sets the smallest point key to the given key.
      57             : // NB: this method set the "absolute" smallest point key. Any existing key is
      58             : // overridden.
      59           2 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
      60           2 :         m.SmallestPoint = k
      61           2 :         m.HasPointKeys = true
      62           2 : }
      63             : 
      64             : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
      65             : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
      66             : // overridden.
      67           2 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
      68           2 :         m.SmallestRangeDel = k
      69           2 :         m.HasRangeDelKeys = true
      70           2 : }
      71             : 
      72             : // SetSmallestRangeKey sets the smallest range key to the given key.
      73             : // NB: this method set the "absolute" smallest range key. Any existing key is
      74             : // overridden.
      75           2 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
      76           2 :         m.SmallestRangeKey = k
      77           2 :         m.HasRangeKeys = true
      78           2 : }
      79             : 
      80             : // SetLargestPointKey sets the largest point key to the given key.
      81             : // NB: this method set the "absolute" largest point key. Any existing key is
      82             : // overridden.
      83           2 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
      84           2 :         m.LargestPoint = k
      85           2 :         m.HasPointKeys = true
      86           2 : }
      87             : 
      88             : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
      89             : // NB: this method set the "absolute" largest rangedel key. Any existing key is
      90             : // overridden.
      91           2 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
      92           2 :         m.LargestRangeDel = k
      93           2 :         m.HasRangeDelKeys = true
      94           2 : }
      95             : 
      96             : // SetLargestRangeKey sets the largest range key to the given key.
      97             : // NB: this method set the "absolute" largest range key. Any existing key is
      98             : // overridden.
      99           2 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
     100           2 :         m.LargestRangeKey = k
     101           2 :         m.HasRangeKeys = true
     102           2 : }
     103             : 
     104           2 : func (m *WriterMetadata) updateSeqNum(seqNum uint64) {
     105           2 :         if m.SmallestSeqNum > seqNum {
     106           2 :                 m.SmallestSeqNum = seqNum
     107           2 :         }
     108           2 :         if m.LargestSeqNum < seqNum {
     109           2 :                 m.LargestSeqNum = seqNum
     110           2 :         }
     111             : }
     112             : 
     113             : // Writer is a table writer.
     114             : type Writer struct {
     115             :         writable objstorage.Writable
     116             :         meta     WriterMetadata
     117             :         err      error
     118             :         // cacheID and fileNum are used to remove blocks written to the sstable from
     119             :         // the cache, providing a defense in depth against bugs which cause cache
     120             :         // collisions.
     121             :         cacheID uint64
     122             :         fileNum base.DiskFileNum
     123             :         // The following fields are copied from Options.
     124             :         blockSize               int
     125             :         blockSizeThreshold      int
     126             :         indexBlockSize          int
     127             :         indexBlockSizeThreshold int
     128             :         compare                 Compare
     129             :         split                   Split
     130             :         formatKey               base.FormatKey
     131             :         compression             Compression
     132             :         separator               Separator
     133             :         successor               Successor
     134             :         tableFormat             TableFormat
     135             :         isStrictObsolete        bool
     136             :         writingToLowestLevel    bool
     137             :         cache                   *cache.Cache
     138             :         restartInterval         int
     139             :         checksumType            ChecksumType
     140             :         // disableKeyOrderChecks disables the checks that keys are added to an
     141             :         // sstable in order. It is intended for internal use only in the construction
     142             :         // of invalid sstables for testing. See tool/make_test_sstables.go.
     143             :         disableKeyOrderChecks bool
     144             :         // With two level indexes, the index/filter of a SST file is partitioned into
     145             :         // smaller blocks with an additional top-level index on them. When reading an
     146             :         // index/filter, only the top-level index is loaded into memory. The two level
     147             :         // index/filter then uses the top-level index to load on demand into the block
     148             :         // cache the partitions that are required to perform the index/filter query.
     149             :         //
     150             :         // Two level indexes are enabled automatically when there is more than one
     151             :         // index block.
     152             :         //
     153             :         // This is useful when there are very large index blocks, which generally occurs
     154             :         // with the usage of large keys. With large index blocks, the index blocks fight
     155             :         // the data blocks for block cache space and the index blocks are likely to be
     156             :         // re-read many times from the disk. The top level index, which has a much
     157             :         // smaller memory footprint, can be used to prevent the entire index block from
     158             :         // being loaded into the block cache.
     159             :         twoLevelIndex bool
     160             :         // Internal flag to allow creation of range-del-v1 format blocks. Only used
     161             :         // for testing. Note that v2 format blocks are backwards compatible with v1
     162             :         // format blocks.
     163             :         rangeDelV1Format    bool
     164             :         indexBlock          *indexBlockBuf
     165             :         rangeDelBlock       blockWriter
     166             :         rangeKeyBlock       blockWriter
     167             :         topLevelIndexBlock  blockWriter
     168             :         props               Properties
     169             :         propCollectors      []TablePropertyCollector
     170             :         blockPropCollectors []BlockPropertyCollector
     171             :         obsoleteCollector   obsoleteKeyBlockPropertyCollector
     172             :         blockPropsEncoder   blockPropertiesEncoder
     173             :         // filter accumulates the filter block. If populated, the filter ingests
     174             :         // either the output of w.split (i.e. a prefix extractor) if w.split is not
     175             :         // nil, or the full keys otherwise.
     176             :         filter          filterWriter
     177             :         indexPartitions []indexBlockAndBlockProperties
     178             : 
     179             :         // indexBlockAlloc is used to bulk-allocate byte slices used to store index
     180             :         // blocks in indexPartitions. These live until the index finishes.
     181             :         indexBlockAlloc []byte
     182             :         // indexSepAlloc is used to bulk-allocate index block separator slices stored
     183             :         // in indexPartitions. These live until the index finishes.
     184             :         indexSepAlloc bytealloc.A
     185             : 
     186             :         // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
     187             :         // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
     188             :         // and values, emitting fragmented, coalesced spans as appropriate. Range
     189             :         // keys must be added in order of their start user-key.
     190             :         fragmenter        keyspan.Fragmenter
     191             :         rangeKeyEncoder   rangekey.Encoder
     192             :         rangeKeysBySuffix keyspan.KeysBySuffix
     193             :         rangeKeySpan      keyspan.Span
     194             :         rkBuf             []byte
     195             :         // dataBlockBuf consists of the state which is currently owned by and used by
     196             :         // the Writer client goroutine. This state can be handed off to other goroutines.
     197             :         dataBlockBuf *dataBlockBuf
     198             :         // blockBuf consists of the state which is owned by and used by the Writer client
     199             :         // goroutine.
     200             :         blockBuf blockBuf
     201             : 
     202             :         coordination coordinationState
     203             : 
     204             :         // Information (other than the byte slice) about the last point key, to
     205             :         // avoid extracting it again.
     206             :         lastPointKeyInfo pointKeyInfo
     207             : 
     208             :         // For value blocks.
     209             :         shortAttributeExtractor   base.ShortAttributeExtractor
     210             :         requiredInPlaceValueBound UserKeyPrefixBound
     211             :         valueBlockWriter          *valueBlockWriter
     212             : }
     213             : 
     214             : type pointKeyInfo struct {
     215             :         trailer uint64
     216             :         // Only computed when w.valueBlockWriter is not nil.
     217             :         userKeyLen int
     218             :         // prefixLen uses w.split, if not nil. Only computed when w.valueBlockWriter
     219             :         // is not nil.
     220             :         prefixLen int
     221             :         // True iff the point was marked obsolete.
     222             :         isObsolete bool
     223             : }
     224             : 
     225             : type coordinationState struct {
     226             :         parallelismEnabled bool
     227             : 
     228             :         // writeQueue is used to write data blocks to disk. The writeQueue is primarily
     229             :         // used to maintain the order in which data blocks must be written to disk. For
     230             :         // this reason, every single data block write must be done through the writeQueue.
     231             :         writeQueue *writeQueue
     232             : 
     233             :         sizeEstimate dataBlockEstimates
     234             : }
     235             : 
     236           2 : func (c *coordinationState) init(parallelismEnabled bool, writer *Writer) {
     237           2 :         c.parallelismEnabled = parallelismEnabled
     238           2 :         // useMutex is false regardless of parallelismEnabled, because we do not do
     239           2 :         // parallel compression yet.
     240           2 :         c.sizeEstimate.useMutex = false
     241           2 : 
     242           2 :         // writeQueueSize determines the size of the write queue, or the number
     243           2 :         // of items which can be added to the queue without blocking. By default, we
     244           2 :         // use a writeQueue size of 0, since we won't be doing any block writes in
     245           2 :         // parallel.
     246           2 :         writeQueueSize := 0
     247           2 :         if parallelismEnabled {
     248           2 :                 writeQueueSize = runtime.GOMAXPROCS(0)
     249           2 :         }
     250           2 :         c.writeQueue = newWriteQueue(writeQueueSize, writer)
     251             : }
     252             : 
     253             : // sizeEstimate is a general purpose helper for estimating two kinds of sizes:
     254             : // A. The compressed sstable size, which is useful for deciding when to start
     255             : //
     256             : //      a new sstable during flushes or compactions. In practice, we use this in
     257             : //      estimating the data size (excluding the index).
     258             : //
     259             : // B. The size of index blocks to decide when to start a new index block.
     260             : //
     261             : // There are some terminology peculiarities which are due to the origin of
     262             : // sizeEstimate for use case A with parallel compression enabled (for which
     263             : // the code has not been merged). Specifically this relates to the terms
     264             : // "written" and "compressed".
     265             : //   - The notion of "written" for case A is sufficiently defined by saying that
     266             : //     the data block is compressed. Waiting for the actual data block write to
     267             : //     happen can result in unnecessary estimation, when we already know how big
     268             : //     it will be in compressed form. Additionally, with the forthcoming value
     269             : //     blocks containing older MVCC values, these compressed block will be held
     270             : //     in-memory until late in the sstable writing, and we do want to accurately
     271             : //     account for them without waiting for the actual write.
     272             : //     For case B, "written" means that the index entry has been fully
     273             : //     generated, and has been added to the uncompressed block buffer for that
     274             : //     index block. It does not include actually writing a potentially
     275             : //     compressed index block.
     276             : //   - The notion of "compressed" is to differentiate between a "inflight" size
     277             : //     and the actual size, and is handled via computing a compression ratio
     278             : //     observed so far (defaults to 1).
     279             : //     For case A, this is actual data block compression, so the "inflight" size
     280             : //     is uncompressed blocks (that are no longer being written to) and the
     281             : //     "compressed" size is after they have been compressed.
     282             : //     For case B the inflight size is for a key-value pair in the index for
     283             : //     which the value size (the encoded size of the BlockHandleWithProperties)
     284             : //     is not accurately known, while the compressed size is the size of that
     285             : //     entry when it has been added to the (in-progress) index ssblock.
     286             : //
     287             : // Usage: To update state, one can optionally provide an inflight write value
     288             : // using addInflight (used for case B). When something is "written" the state
     289             : // can be updated using either writtenWithDelta or writtenWithTotal, which
     290             : // provide the actual delta size or the total size (latter must be
     291             : // monotonically non-decreasing). If there were no calls to addInflight, there
     292             : // isn't any real estimation happening here. So case A does not do any real
     293             : // estimation. However, when we introduce parallel compression, there will be
     294             : // estimation in that the client goroutine will call addInFlight and the
     295             : // compression goroutines will call writtenWithDelta.
     296             : type sizeEstimate struct {
     297             :         // emptySize is the size when there is no inflight data, and numEntries is 0.
     298             :         // emptySize is constant once set.
     299             :         emptySize uint64
     300             : 
     301             :         // inflightSize is the estimated size of some inflight data which hasn't
     302             :         // been written yet.
     303             :         inflightSize uint64
     304             : 
     305             :         // totalSize is the total size of the data which has already been written.
     306             :         totalSize uint64
     307             : 
     308             :         // numWrittenEntries is the total number of entries which have already been
     309             :         // written.
     310             :         numWrittenEntries uint64
     311             :         // numInflightEntries is the total number of entries which are inflight, and
     312             :         // haven't been written.
     313             :         numInflightEntries uint64
     314             : 
     315             :         // maxEstimatedSize stores the maximum result returned from sizeEstimate.size.
     316             :         // It ensures that values returned from subsequent calls to Writer.EstimatedSize
     317             :         // never decrease.
     318             :         maxEstimatedSize uint64
     319             : 
     320             :         // We assume that the entries added to the sizeEstimate can be compressed.
     321             :         // For this reason, we keep track of a compressedSize and an uncompressedSize
     322             :         // to compute a compression ratio for the inflight entries. If the entries
     323             :         // aren't being compressed, then compressedSize and uncompressedSize must be
     324             :         // equal.
     325             :         compressedSize   uint64
     326             :         uncompressedSize uint64
     327             : }
     328             : 
     329           2 : func (s *sizeEstimate) init(emptySize uint64) {
     330           2 :         s.emptySize = emptySize
     331           2 : }
     332             : 
     333           2 : func (s *sizeEstimate) size() uint64 {
     334           2 :         ratio := float64(1)
     335           2 :         if s.uncompressedSize > 0 {
     336           2 :                 ratio = float64(s.compressedSize) / float64(s.uncompressedSize)
     337           2 :         }
     338           2 :         estimatedInflightSize := uint64(float64(s.inflightSize) * ratio)
     339           2 :         total := s.totalSize + estimatedInflightSize
     340           2 :         if total > s.maxEstimatedSize {
     341           2 :                 s.maxEstimatedSize = total
     342           2 :         } else {
     343           2 :                 total = s.maxEstimatedSize
     344           2 :         }
     345             : 
     346           2 :         if total == 0 {
     347           2 :                 return s.emptySize
     348           2 :         }
     349             : 
     350           2 :         return total
     351             : }
     352             : 
     353           2 : func (s *sizeEstimate) numTotalEntries() uint64 {
     354           2 :         return s.numWrittenEntries + s.numInflightEntries
     355           2 : }
     356             : 
     357           2 : func (s *sizeEstimate) addInflight(size int) {
     358           2 :         s.numInflightEntries++
     359           2 :         s.inflightSize += uint64(size)
     360           2 : }
     361             : 
     362           2 : func (s *sizeEstimate) writtenWithTotal(newTotalSize uint64, inflightSize int) {
     363           2 :         finalEntrySize := int(newTotalSize - s.totalSize)
     364           2 :         s.writtenWithDelta(finalEntrySize, inflightSize)
     365           2 : }
     366             : 
     367           2 : func (s *sizeEstimate) writtenWithDelta(finalEntrySize int, inflightSize int) {
     368           2 :         if inflightSize > 0 {
     369           2 :                 // This entry was previously inflight, so we should decrement inflight
     370           2 :                 // entries and update the "compression" stats for future estimation.
     371           2 :                 s.numInflightEntries--
     372           2 :                 s.inflightSize -= uint64(inflightSize)
     373           2 :                 s.uncompressedSize += uint64(inflightSize)
     374           2 :                 s.compressedSize += uint64(finalEntrySize)
     375           2 :         }
     376           2 :         s.numWrittenEntries++
     377           2 :         s.totalSize += uint64(finalEntrySize)
     378             : }
     379             : 
     380           2 : func (s *sizeEstimate) clear() {
     381           2 :         *s = sizeEstimate{emptySize: s.emptySize}
     382           2 : }
     383             : 
     384             : type indexBlockBuf struct {
     385             :         // block will only be accessed from the writeQueue.
     386             :         block blockWriter
     387             : 
     388             :         size struct {
     389             :                 useMutex bool
     390             :                 mu       sync.Mutex
     391             :                 estimate sizeEstimate
     392             :         }
     393             : 
     394             :         // restartInterval matches indexBlockBuf.block.restartInterval. We store it twice, because the `block`
     395             :         // must only be accessed from the writeQueue goroutine.
     396             :         restartInterval int
     397             : }
     398             : 
     399           2 : func (i *indexBlockBuf) clear() {
     400           2 :         i.block.clear()
     401           2 :         if i.size.useMutex {
     402           2 :                 i.size.mu.Lock()
     403           2 :                 defer i.size.mu.Unlock()
     404           2 :         }
     405           2 :         i.size.estimate.clear()
     406           2 :         i.restartInterval = 0
     407             : }
     408             : 
     409             : var indexBlockBufPool = sync.Pool{
     410           2 :         New: func() interface{} {
     411           2 :                 return &indexBlockBuf{}
     412           2 :         },
     413             : }
     414             : 
     415             : const indexBlockRestartInterval = 1
     416             : 
     417           2 : func newIndexBlockBuf(useMutex bool) *indexBlockBuf {
     418           2 :         i := indexBlockBufPool.Get().(*indexBlockBuf)
     419           2 :         i.size.useMutex = useMutex
     420           2 :         i.restartInterval = indexBlockRestartInterval
     421           2 :         i.block.restartInterval = indexBlockRestartInterval
     422           2 :         i.size.estimate.init(emptyBlockSize)
     423           2 :         return i
     424           2 : }
     425             : 
     426             : func (i *indexBlockBuf) shouldFlush(
     427             :         sep InternalKey, valueLen, targetBlockSize, sizeThreshold int,
     428           2 : ) bool {
     429           2 :         if i.size.useMutex {
     430           2 :                 i.size.mu.Lock()
     431           2 :                 defer i.size.mu.Unlock()
     432           2 :         }
     433             : 
     434           2 :         nEntries := i.size.estimate.numTotalEntries()
     435           2 :         return shouldFlush(
     436           2 :                 sep, valueLen, i.restartInterval, int(i.size.estimate.size()),
     437           2 :                 int(nEntries), targetBlockSize, sizeThreshold)
     438             : }
     439             : 
     440           2 : func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) {
     441           2 :         i.block.add(key, value)
     442           2 :         size := i.block.estimatedSize()
     443           2 :         if i.size.useMutex {
     444           2 :                 i.size.mu.Lock()
     445           2 :                 defer i.size.mu.Unlock()
     446           2 :         }
     447           2 :         i.size.estimate.writtenWithTotal(uint64(size), inflightSize)
     448             : }
     449             : 
     450           2 : func (i *indexBlockBuf) finish() []byte {
     451           2 :         b := i.block.finish()
     452           2 :         return b
     453           2 : }
     454             : 
     455           2 : func (i *indexBlockBuf) addInflight(inflightSize int) {
     456           2 :         if i.size.useMutex {
     457           2 :                 i.size.mu.Lock()
     458           2 :                 defer i.size.mu.Unlock()
     459           2 :         }
     460           2 :         i.size.estimate.addInflight(inflightSize)
     461             : }
     462             : 
     463           2 : func (i *indexBlockBuf) estimatedSize() uint64 {
     464           2 :         if i.size.useMutex {
     465           2 :                 i.size.mu.Lock()
     466           2 :                 defer i.size.mu.Unlock()
     467           2 :         }
     468             : 
     469             :         // Make sure that the size estimation works as expected when parallelism
     470             :         // is disabled.
     471           2 :         if invariants.Enabled && !i.size.useMutex {
     472           2 :                 if i.size.estimate.inflightSize != 0 {
     473           0 :                         panic("unexpected inflight entry in index block size estimation")
     474             :                 }
     475             : 
     476             :                 // NB: The i.block should only be accessed from the writeQueue goroutine,
     477             :                 // when parallelism is enabled. We break that invariant here, but that's
     478             :                 // okay since parallelism is disabled.
     479           2 :                 if i.size.estimate.size() != uint64(i.block.estimatedSize()) {
     480           0 :                         panic("index block size estimation sans parallelism is incorrect")
     481             :                 }
     482             :         }
     483           2 :         return i.size.estimate.size()
     484             : }
     485             : 
     486             : // sizeEstimate is used for sstable size estimation. sizeEstimate can be
     487             : // accessed by the Writer client and compressionQueue goroutines. Fields
     488             : // should only be read/updated through the functions defined on the
     489             : // *sizeEstimate type.
     490             : type dataBlockEstimates struct {
     491             :         // If we don't do block compression in parallel, then we don't need to take
     492             :         // the performance hit of synchronizing using this mutex.
     493             :         useMutex bool
     494             :         mu       sync.Mutex
     495             : 
     496             :         estimate sizeEstimate
     497             : }
     498             : 
     499             : // inflightSize is the uncompressed block size estimate which has been
     500             : // previously provided to addInflightDataBlock(). If addInflightDataBlock()
     501             : // has not been called, this must be set to 0. compressedSize is the
     502             : // compressed size of the block.
     503           2 : func (d *dataBlockEstimates) dataBlockCompressed(compressedSize int, inflightSize int) {
     504           2 :         if d.useMutex {
     505           0 :                 d.mu.Lock()
     506           0 :                 defer d.mu.Unlock()
     507           0 :         }
     508           2 :         d.estimate.writtenWithDelta(compressedSize+blockTrailerLen, inflightSize)
     509             : }
     510             : 
     511             : // size is an estimated size of datablock data which has been written to disk.
     512           2 : func (d *dataBlockEstimates) size() uint64 {
     513           2 :         if d.useMutex {
     514           0 :                 d.mu.Lock()
     515           0 :                 defer d.mu.Unlock()
     516           0 :         }
     517             :         // If there is no parallel compression, there should not be any inflight bytes.
     518           2 :         if invariants.Enabled && !d.useMutex {
     519           2 :                 if d.estimate.inflightSize != 0 {
     520           0 :                         panic("unexpected inflight entry in data block size estimation")
     521             :                 }
     522             :         }
     523           2 :         return d.estimate.size()
     524             : }
     525             : 
     526             : // Avoid linter unused error.
     527             : var _ = (&dataBlockEstimates{}).addInflightDataBlock
     528             : 
     529             : // NB: unused since no parallel compression.
     530           0 : func (d *dataBlockEstimates) addInflightDataBlock(size int) {
     531           0 :         if d.useMutex {
     532           0 :                 d.mu.Lock()
     533           0 :                 defer d.mu.Unlock()
     534           0 :         }
     535             : 
     536           0 :         d.estimate.addInflight(size)
     537             : }
     538             : 
     539             : var writeTaskPool = sync.Pool{
     540           2 :         New: func() interface{} {
     541           2 :                 t := &writeTask{}
     542           2 :                 t.compressionDone = make(chan bool, 1)
     543           2 :                 return t
     544           2 :         },
     545             : }
     546             : 
     547             : type checksummer struct {
     548             :         checksumType ChecksumType
     549             :         xxHasher     *xxhash.Digest
     550             : }
     551             : 
     552           2 : func (c *checksummer) checksum(block []byte, blockType []byte) (checksum uint32) {
     553           2 :         // Calculate the checksum.
     554           2 :         switch c.checksumType {
     555           2 :         case ChecksumTypeCRC32c:
     556           2 :                 checksum = crc.New(block).Update(blockType).Value()
     557           1 :         case ChecksumTypeXXHash64:
     558           1 :                 if c.xxHasher == nil {
     559           1 :                         c.xxHasher = xxhash.New()
     560           1 :                 } else {
     561           1 :                         c.xxHasher.Reset()
     562           1 :                 }
     563           1 :                 c.xxHasher.Write(block)
     564           1 :                 c.xxHasher.Write(blockType)
     565           1 :                 checksum = uint32(c.xxHasher.Sum64())
     566           0 :         default:
     567           0 :                 panic(errors.Newf("unsupported checksum type: %d", c.checksumType))
     568             :         }
     569           2 :         return checksum
     570             : }
     571             : 
     572             : type blockBuf struct {
     573             :         // tmp is a scratch buffer, large enough to hold either footerLen bytes,
     574             :         // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most
     575             :         // likely large enough for a block handle with properties.
     576             :         tmp [blockHandleLikelyMaxLen]byte
     577             :         // compressedBuf is the destination buffer for compression. It is re-used over the
     578             :         // lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block.
     579             :         compressedBuf []byte
     580             :         checksummer   checksummer
     581             : }
     582             : 
     583           2 : func (b *blockBuf) clear() {
     584           2 :         // We can't assign b.compressedBuf[:0] to compressedBuf because snappy relies
     585           2 :         // on the length of the buffer, and not the capacity to determine if it needs
     586           2 :         // to make an allocation.
     587           2 :         *b = blockBuf{
     588           2 :                 compressedBuf: b.compressedBuf, checksummer: b.checksummer,
     589           2 :         }
     590           2 : }
     591             : 
     592             : // A dataBlockBuf holds all the state required to compress and write a data block to disk.
     593             : // A dataBlockBuf begins its lifecycle owned by the Writer client goroutine. The Writer
     594             : // client goroutine adds keys to the sstable, writing directly into a dataBlockBuf's blockWriter
     595             : // until the block is full. Once a dataBlockBuf's block is full, the dataBlockBuf may be passed
     596             : // to other goroutines for compression and file I/O.
     597             : type dataBlockBuf struct {
     598             :         blockBuf
     599             :         dataBlock blockWriter
     600             : 
     601             :         // uncompressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
     602             :         // next byte slice to be compressed. The uncompressed byte slice will be backed by the
     603             :         // dataBlock.buf.
     604             :         uncompressed []byte
     605             :         // compressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
     606             :         // compressed byte slice which must be written to disk. The compressed byte slice may be
     607             :         // backed by the dataBlock.buf, or the dataBlockBuf.compressedBuf, depending on whether
     608             :         // we use the result of the compression.
     609             :         compressed []byte
     610             : 
     611             :         // We're making calls to BlockPropertyCollectors from the Writer client goroutine. We need to
     612             :         // pass the encoded block properties over to the write queue. To prevent copies, and allocations,
     613             :         // we give each dataBlockBuf, a blockPropertiesEncoder.
     614             :         blockPropsEncoder blockPropertiesEncoder
     615             :         // dataBlockProps is set when Writer.finishDataBlockProps is called. The dataBlockProps slice is
     616             :         // a shallow copy of the internal buffer of the dataBlockBuf.blockPropsEncoder.
     617             :         dataBlockProps []byte
     618             : 
     619             :         // sepScratch is reusable scratch space for computing separator keys.
     620             :         sepScratch []byte
     621             : }
     622             : 
     623           2 : func (d *dataBlockBuf) clear() {
     624           2 :         d.blockBuf.clear()
     625           2 :         d.dataBlock.clear()
     626           2 : 
     627           2 :         d.uncompressed = nil
     628           2 :         d.compressed = nil
     629           2 :         d.dataBlockProps = nil
     630           2 :         d.sepScratch = d.sepScratch[:0]
     631           2 : }
     632             : 
     633             : var dataBlockBufPool = sync.Pool{
     634           2 :         New: func() interface{} {
     635           2 :                 return &dataBlockBuf{}
     636           2 :         },
     637             : }
     638             : 
     639           2 : func newDataBlockBuf(restartInterval int, checksumType ChecksumType) *dataBlockBuf {
     640           2 :         d := dataBlockBufPool.Get().(*dataBlockBuf)
     641           2 :         d.dataBlock.restartInterval = restartInterval
     642           2 :         d.checksummer.checksumType = checksumType
     643           2 :         return d
     644           2 : }
     645             : 
     646           2 : func (d *dataBlockBuf) finish() {
     647           2 :         d.uncompressed = d.dataBlock.finish()
     648           2 : }
     649             : 
     650           2 : func (d *dataBlockBuf) compressAndChecksum(c Compression) {
     651           2 :         d.compressed = compressAndChecksum(d.uncompressed, c, &d.blockBuf)
     652           2 : }
     653             : 
     654             : func (d *dataBlockBuf) shouldFlush(
     655             :         key InternalKey, valueLen, targetBlockSize, sizeThreshold int,
     656           2 : ) bool {
     657           2 :         return shouldFlush(
     658           2 :                 key, valueLen, d.dataBlock.restartInterval, d.dataBlock.estimatedSize(),
     659           2 :                 d.dataBlock.nEntries, targetBlockSize, sizeThreshold)
     660           2 : }
     661             : 
     662             : type indexBlockAndBlockProperties struct {
     663             :         nEntries int
     664             :         // sep is the last key added to this block, for computing a separator later.
     665             :         sep        InternalKey
     666             :         properties []byte
     667             :         // block is the encoded block produced by blockWriter.finish.
     668             :         block []byte
     669             : }
     670             : 
     671             : // Set sets the value for the given key. The sequence number is set to 0.
     672             : // Intended for use to externally construct an sstable before ingestion into a
     673             : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
     674             : // order.
     675             : //
     676             : // TODO(peter): untested
     677           1 : func (w *Writer) Set(key, value []byte) error {
     678           1 :         if w.err != nil {
     679           0 :                 return w.err
     680           0 :         }
     681           1 :         if w.isStrictObsolete {
     682           0 :                 return errors.Errorf("use AddWithForceObsolete")
     683           0 :         }
     684             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     685             :         // sstable delete the added points.
     686           1 :         return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
     687             : }
     688             : 
     689             : // Delete deletes the value for the given key. The sequence number is set to
     690             : // 0. Intended for use to externally construct an sstable before ingestion into
     691             : // a DB.
     692             : //
     693             : // TODO(peter): untested
     694           1 : func (w *Writer) Delete(key []byte) error {
     695           1 :         if w.err != nil {
     696           0 :                 return w.err
     697           0 :         }
     698           1 :         if w.isStrictObsolete {
     699           0 :                 return errors.Errorf("use AddWithForceObsolete")
     700           0 :         }
     701             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     702             :         // sstable delete the added points.
     703           1 :         return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
     704             : }
     705             : 
     706             : // DeleteRange deletes all of the keys (and values) in the range [start,end)
     707             : // (inclusive on start, exclusive on end). The sequence number is set to
     708             : // 0. Intended for use to externally construct an sstable before ingestion into
     709             : // a DB.
     710             : //
     711             : // TODO(peter): untested
     712           2 : func (w *Writer) DeleteRange(start, end []byte) error {
     713           2 :         if w.err != nil {
     714           0 :                 return w.err
     715           0 :         }
     716           2 :         return w.addTombstone(base.MakeInternalKey(start, 0, InternalKeyKindRangeDelete), end)
     717             : }
     718             : 
     719             : // Merge adds an action to the DB that merges the value at key with the new
     720             : // value. The details of the merge are dependent upon the configured merge
     721             : // operator. The sequence number is set to 0. Intended for use to externally
     722             : // construct an sstable before ingestion into a DB.
     723             : //
     724             : // TODO(peter): untested
     725           1 : func (w *Writer) Merge(key, value []byte) error {
     726           1 :         if w.err != nil {
     727           0 :                 return w.err
     728           0 :         }
     729           1 :         if w.isStrictObsolete {
     730           0 :                 return errors.Errorf("use AddWithForceObsolete")
     731           0 :         }
     732             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     733             :         // sstable that delete the added points. If the user configured this writer
     734             :         // to be strict-obsolete, addPoint will reject the addition of this MERGE.
     735           1 :         return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
     736             : }
     737             : 
     738             : // Add adds a key/value pair to the table being written. For a given Writer,
     739             : // the keys passed to Add must be in increasing order. The exception to this
     740             : // rule is range deletion tombstones. Range deletion tombstones need to be
     741             : // added ordered by their start key, but they can be added out of order from
     742             : // point entries. Additionally, range deletion tombstones must be fragmented
     743             : // (i.e. by keyspan.Fragmenter).
     744           2 : func (w *Writer) Add(key InternalKey, value []byte) error {
     745           2 :         if w.isStrictObsolete {
     746           0 :                 return errors.Errorf("use AddWithForceObsolete")
     747           0 :         }
     748           2 :         return w.AddWithForceObsolete(key, value, false)
     749             : }
     750             : 
     751             : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
     752             : //
     753             : // forceObsolete indicates whether the caller has determined that this key is
     754             : // obsolete even though it may be the latest point key for this userkey. This
     755             : // should be set to true for keys obsoleted by RANGEDELs, and is required for
     756             : // strict-obsolete sstables.
     757             : //
     758             : // Note that there are two properties, S1 and S2 (see comment in format.go)
     759             : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
     760             : // responsibility of the caller. S1 is solely the responsibility of the
     761             : // callee.
     762           2 : func (w *Writer) AddWithForceObsolete(key InternalKey, value []byte, forceObsolete bool) error {
     763           2 :         if w.err != nil {
     764           0 :                 return w.err
     765           0 :         }
     766             : 
     767           2 :         switch key.Kind() {
     768           2 :         case InternalKeyKindRangeDelete:
     769           2 :                 return w.addTombstone(key, value)
     770             :         case base.InternalKeyKindRangeKeyDelete,
     771             :                 base.InternalKeyKindRangeKeySet,
     772           0 :                 base.InternalKeyKindRangeKeyUnset:
     773           0 :                 w.err = errors.Errorf(
     774           0 :                         "pebble: range keys must be added via one of the RangeKey* functions")
     775           0 :                 return w.err
     776             :         }
     777           2 :         return w.addPoint(key, value, forceObsolete)
     778             : }
     779             : 
     780           2 : func (w *Writer) makeAddPointDecisionV2(key InternalKey) error {
     781           2 :         prevTrailer := w.lastPointKeyInfo.trailer
     782           2 :         w.lastPointKeyInfo.trailer = key.Trailer
     783           2 :         if w.dataBlockBuf.dataBlock.nEntries == 0 {
     784           2 :                 return nil
     785           2 :         }
     786           2 :         if !w.disableKeyOrderChecks {
     787           2 :                 prevPointUserKey := w.dataBlockBuf.dataBlock.getCurUserKey()
     788           2 :                 cmpUser := w.compare(prevPointUserKey, key.UserKey)
     789           2 :                 if cmpUser > 0 || (cmpUser == 0 && prevTrailer <= key.Trailer) {
     790           1 :                         return errors.Errorf(
     791           1 :                                 "pebble: keys must be added in strictly increasing order: %s, %s",
     792           1 :                                 InternalKey{UserKey: prevPointUserKey, Trailer: prevTrailer}.Pretty(w.formatKey),
     793           1 :                                 key.Pretty(w.formatKey))
     794           1 :                 }
     795             :         }
     796           2 :         return nil
     797             : }
     798             : 
     799             : // REQUIRES: at least one point has been written to the Writer.
     800           2 : func (w *Writer) getLastPointUserKey() []byte {
     801           2 :         if w.dataBlockBuf.dataBlock.nEntries == 0 {
     802           0 :                 panic(errors.AssertionFailedf("no point keys added to writer"))
     803             :         }
     804           2 :         return w.dataBlockBuf.dataBlock.getCurUserKey()
     805             : }
     806             : 
     807             : func (w *Writer) makeAddPointDecisionV3(
     808             :         key InternalKey, valueLen int,
     809           2 : ) (setHasSamePrefix bool, writeToValueBlock bool, isObsolete bool, err error) {
     810           2 :         prevPointKeyInfo := w.lastPointKeyInfo
     811           2 :         w.lastPointKeyInfo.userKeyLen = len(key.UserKey)
     812           2 :         w.lastPointKeyInfo.prefixLen = w.lastPointKeyInfo.userKeyLen
     813           2 :         if w.split != nil {
     814           2 :                 w.lastPointKeyInfo.prefixLen = w.split(key.UserKey)
     815           2 :         }
     816           2 :         w.lastPointKeyInfo.trailer = key.Trailer
     817           2 :         w.lastPointKeyInfo.isObsolete = false
     818           2 :         if !w.meta.HasPointKeys {
     819           2 :                 return false, false, false, nil
     820           2 :         }
     821           2 :         keyKind := base.TrailerKind(key.Trailer)
     822           2 :         prevPointUserKey := w.getLastPointUserKey()
     823           2 :         prevPointKey := InternalKey{UserKey: prevPointUserKey, Trailer: prevPointKeyInfo.trailer}
     824           2 :         prevKeyKind := base.TrailerKind(prevPointKeyInfo.trailer)
     825           2 :         considerWriteToValueBlock := prevKeyKind == InternalKeyKindSet &&
     826           2 :                 keyKind == InternalKeyKindSet
     827           2 :         if considerWriteToValueBlock && !w.requiredInPlaceValueBound.IsEmpty() {
     828           1 :                 keyPrefix := key.UserKey[:w.lastPointKeyInfo.prefixLen]
     829           1 :                 cmpUpper := w.compare(
     830           1 :                         w.requiredInPlaceValueBound.Upper, keyPrefix)
     831           1 :                 if cmpUpper <= 0 {
     832           1 :                         // Common case for CockroachDB. Make it empty since all future keys in
     833           1 :                         // this sstable will also have cmpUpper <= 0.
     834           1 :                         w.requiredInPlaceValueBound = UserKeyPrefixBound{}
     835           1 :                 } else if w.compare(keyPrefix, w.requiredInPlaceValueBound.Lower) >= 0 {
     836           1 :                         considerWriteToValueBlock = false
     837           1 :                 }
     838             :         }
     839             :         // cmpPrefix is initialized iff considerWriteToValueBlock.
     840           2 :         var cmpPrefix int
     841           2 :         var cmpUser int
     842           2 :         if considerWriteToValueBlock {
     843           2 :                 // Compare the prefixes.
     844           2 :                 cmpPrefix = w.compare(prevPointUserKey[:prevPointKeyInfo.prefixLen],
     845           2 :                         key.UserKey[:w.lastPointKeyInfo.prefixLen])
     846           2 :                 cmpUser = cmpPrefix
     847           2 :                 if cmpPrefix == 0 {
     848           2 :                         // Need to compare suffixes to compute cmpUser.
     849           2 :                         cmpUser = w.compare(prevPointUserKey[prevPointKeyInfo.prefixLen:],
     850           2 :                                 key.UserKey[w.lastPointKeyInfo.prefixLen:])
     851           2 :                 }
     852           2 :         } else {
     853           2 :                 cmpUser = w.compare(prevPointUserKey, key.UserKey)
     854           2 :         }
     855             :         // Ensure that no one adds a point key kind without considering the obsolete
     856             :         // handling for that kind.
     857           2 :         switch keyKind {
     858             :         case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
     859           2 :                 InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     860           0 :         default:
     861           0 :                 panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
     862             :         }
     863             :         // If same user key, then the current key is obsolete if any of the
     864             :         // following is true:
     865             :         // C1 The prev key was obsolete.
     866             :         // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
     867             :         //    preserve SET* and MERGE since their values will be merged into the
     868             :         //    previous key. We also must preserve DEL* since there may be an older
     869             :         //    SET*/MERGE in a lower level that must not be merged with the MERGE --
     870             :         //    if we omit the DEL* that lower SET*/MERGE will become visible.
     871             :         //
     872             :         // Regardless of whether it is the same user key or not
     873             :         // C3 The current key is some kind of point delete, and we are writing to
     874             :         //    the lowest level, then it is also obsolete. The correctness of this
     875             :         //    relies on the same user key not spanning multiple sstables in a level.
     876             :         //
     877             :         // C1 ensures that for a user key there is at most one transition from
     878             :         // !obsolete to obsolete. Consider a user key k, for which the first n keys
     879             :         // are not obsolete. We consider the various value of n:
     880             :         //
     881             :         // n = 0: This happens due to forceObsolete being set by the caller, or due
     882             :         // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
     883             :         // must also delete all the lower seqnums for the same user key. C3 triggers
     884             :         // due to a point delete and that deletes all the lower seqnums for the same
     885             :         // user key.
     886             :         //
     887             :         // n = 1: This is the common case. It happens when the first key is not a
     888             :         // MERGE, or the current key is some kind of point delete.
     889             :         //
     890             :         // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
     891             :         // single non-MERGE key.
     892           2 :         isObsoleteC1AndC2 := cmpUser == 0 &&
     893           2 :                 (prevPointKeyInfo.isObsolete || prevKeyKind != InternalKeyKindMerge)
     894           2 :         isObsoleteC3 := w.writingToLowestLevel &&
     895           2 :                 (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
     896           2 :                         keyKind == InternalKeyKindDeleteSized)
     897           2 :         isObsolete = isObsoleteC1AndC2 || isObsoleteC3
     898           2 :         // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
     899           2 :         // possible, but requires some care in documenting and checking invariants.
     900           2 :         // There is code that assumes nothing in value blocks because of single MVCC
     901           2 :         // version (those should be ok). We have to ensure setHasSamePrefix is
     902           2 :         // correctly initialized here etc.
     903           2 : 
     904           2 :         if !w.disableKeyOrderChecks &&
     905           2 :                 (cmpUser > 0 || (cmpUser == 0 && prevPointKeyInfo.trailer <= key.Trailer)) {
     906           1 :                 return false, false, false, errors.Errorf(
     907           1 :                         "pebble: keys must be added in strictly increasing order: %s, %s",
     908           1 :                         prevPointKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
     909           1 :         }
     910           2 :         if !considerWriteToValueBlock {
     911           2 :                 return false, false, isObsolete, nil
     912           2 :         }
     913             :         // NB: it is possible that cmpUser == 0, i.e., these two SETs have identical
     914             :         // user keys (because of an open snapshot). This should be the rare case.
     915           2 :         setHasSamePrefix = cmpPrefix == 0
     916           2 :         considerWriteToValueBlock = setHasSamePrefix
     917           2 :         // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
     918           2 :         // valueHandle, this should be > 3. But tiny values are common in test and
     919           2 :         // unlikely in production, so we use 0 here for better test coverage.
     920           2 :         const tinyValueThreshold = 0
     921           2 :         if considerWriteToValueBlock && valueLen <= tinyValueThreshold {
     922           2 :                 considerWriteToValueBlock = false
     923           2 :         }
     924           2 :         return setHasSamePrefix, considerWriteToValueBlock, isObsolete, nil
     925             : }
     926             : 
     927           2 : func (w *Writer) addPoint(key InternalKey, value []byte, forceObsolete bool) error {
     928           2 :         if w.isStrictObsolete && key.Kind() == InternalKeyKindMerge {
     929           1 :                 return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
     930           1 :         }
     931           2 :         var err error
     932           2 :         var setHasSameKeyPrefix, writeToValueBlock, addPrefixToValueStoredWithKey bool
     933           2 :         var isObsolete bool
     934           2 :         maxSharedKeyLen := len(key.UserKey)
     935           2 :         if w.valueBlockWriter != nil {
     936           2 :                 // maxSharedKeyLen is limited to the prefix of the preceding key. If the
     937           2 :                 // preceding key was in a different block, then the blockWriter will
     938           2 :                 // ignore this maxSharedKeyLen.
     939           2 :                 maxSharedKeyLen = w.lastPointKeyInfo.prefixLen
     940           2 :                 setHasSameKeyPrefix, writeToValueBlock, isObsolete, err =
     941           2 :                         w.makeAddPointDecisionV3(key, len(value))
     942           2 :                 addPrefixToValueStoredWithKey = base.TrailerKind(key.Trailer) == InternalKeyKindSet
     943           2 :         } else {
     944           2 :                 err = w.makeAddPointDecisionV2(key)
     945           2 :         }
     946           2 :         if err != nil {
     947           1 :                 return err
     948           1 :         }
     949           2 :         isObsolete = w.tableFormat >= TableFormatPebblev4 && (isObsolete || forceObsolete)
     950           2 :         w.lastPointKeyInfo.isObsolete = isObsolete
     951           2 :         var valueStoredWithKey []byte
     952           2 :         var prefix valuePrefix
     953           2 :         var valueStoredWithKeyLen int
     954           2 :         if writeToValueBlock {
     955           2 :                 vh, err := w.valueBlockWriter.addValue(value)
     956           2 :                 if err != nil {
     957           0 :                         return err
     958           0 :                 }
     959           2 :                 n := encodeValueHandle(w.blockBuf.tmp[:], vh)
     960           2 :                 valueStoredWithKey = w.blockBuf.tmp[:n]
     961           2 :                 valueStoredWithKeyLen = len(valueStoredWithKey) + 1
     962           2 :                 var attribute base.ShortAttribute
     963           2 :                 if w.shortAttributeExtractor != nil {
     964           1 :                         // TODO(sumeer): for compactions, it is possible that the input sstable
     965           1 :                         // already has this value in the value section and so we have already
     966           1 :                         // extracted the ShortAttribute. Avoid extracting it again. This will
     967           1 :                         // require changing the Writer.Add interface.
     968           1 :                         if attribute, err = w.shortAttributeExtractor(
     969           1 :                                 key.UserKey, w.lastPointKeyInfo.prefixLen, value); err != nil {
     970           0 :                                 return err
     971           0 :                         }
     972             :                 }
     973           2 :                 prefix = makePrefixForValueHandle(setHasSameKeyPrefix, attribute)
     974           2 :         } else {
     975           2 :                 valueStoredWithKey = value
     976           2 :                 valueStoredWithKeyLen = len(value)
     977           2 :                 if addPrefixToValueStoredWithKey {
     978           2 :                         valueStoredWithKeyLen++
     979           2 :                 }
     980           2 :                 prefix = makePrefixForInPlaceValue(setHasSameKeyPrefix)
     981             :         }
     982             : 
     983           2 :         if err := w.maybeFlush(key, valueStoredWithKeyLen); err != nil {
     984           1 :                 return err
     985           1 :         }
     986             : 
     987           2 :         for i := range w.propCollectors {
     988           1 :                 if err := w.propCollectors[i].Add(key, value); err != nil {
     989           1 :                         w.err = err
     990           1 :                         return err
     991           1 :                 }
     992             :         }
     993           2 :         for i := range w.blockPropCollectors {
     994           2 :                 v := value
     995           2 :                 if addPrefixToValueStoredWithKey {
     996           2 :                         // Values for SET are not required to be in-place, and in the future may
     997           2 :                         // not even be read by the compaction, so pass nil values. Block
     998           2 :                         // property collectors in such Pebble DB's must not look at the value.
     999           2 :                         v = nil
    1000           2 :                 }
    1001           2 :                 if err := w.blockPropCollectors[i].Add(key, v); err != nil {
    1002           1 :                         w.err = err
    1003           1 :                         return err
    1004           1 :                 }
    1005             :         }
    1006           2 :         if w.tableFormat >= TableFormatPebblev4 {
    1007           2 :                 w.obsoleteCollector.AddPoint(isObsolete)
    1008           2 :         }
    1009             : 
    1010           2 :         w.maybeAddToFilter(key.UserKey)
    1011           2 :         w.dataBlockBuf.dataBlock.addWithOptionalValuePrefix(
    1012           2 :                 key, isObsolete, valueStoredWithKey, maxSharedKeyLen, addPrefixToValueStoredWithKey, prefix,
    1013           2 :                 setHasSameKeyPrefix)
    1014           2 : 
    1015           2 :         w.meta.updateSeqNum(key.SeqNum())
    1016           2 : 
    1017           2 :         if !w.meta.HasPointKeys {
    1018           2 :                 k := w.dataBlockBuf.dataBlock.getCurKey()
    1019           2 :                 // NB: We need to ensure that SmallestPoint.UserKey is set, so we create
    1020           2 :                 // an InternalKey which is semantically identical to the key, but won't
    1021           2 :                 // have a nil UserKey. We do this, because key.UserKey could be nil, and
    1022           2 :                 // we don't want SmallestPoint.UserKey to be nil.
    1023           2 :                 //
    1024           2 :                 // todo(bananabrick): Determine if it's okay to have a nil SmallestPoint
    1025           2 :                 // .UserKey now that we don't rely on a nil UserKey to determine if the
    1026           2 :                 // key has been set or not.
    1027           2 :                 w.meta.SetSmallestPointKey(k.Clone())
    1028           2 :         }
    1029             : 
    1030           2 :         w.props.NumEntries++
    1031           2 :         switch key.Kind() {
    1032           2 :         case InternalKeyKindDelete, InternalKeyKindSingleDelete:
    1033           2 :                 w.props.NumDeletions++
    1034           2 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
    1035           2 :         case InternalKeyKindDeleteSized:
    1036           2 :                 var size uint64
    1037           2 :                 if len(value) > 0 {
    1038           2 :                         var n int
    1039           2 :                         size, n = binary.Uvarint(value)
    1040           2 :                         if n <= 0 {
    1041           0 :                                 w.err = errors.Newf("%s key's value (%x) does not parse as uvarint",
    1042           0 :                                         errors.Safe(key.Kind().String()), value)
    1043           0 :                                 return w.err
    1044           0 :                         }
    1045             :                 }
    1046           2 :                 w.props.NumDeletions++
    1047           2 :                 w.props.NumSizedDeletions++
    1048           2 :                 w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
    1049           2 :                 w.props.RawPointTombstoneValueSize += size
    1050           2 :         case InternalKeyKindMerge:
    1051           2 :                 w.props.NumMergeOperands++
    1052             :         }
    1053           2 :         w.props.RawKeySize += uint64(key.Size())
    1054           2 :         w.props.RawValueSize += uint64(len(value))
    1055           2 :         return nil
    1056             : }
    1057             : 
    1058           1 : func (w *Writer) prettyTombstone(k InternalKey, value []byte) fmt.Formatter {
    1059           1 :         return keyspan.Span{
    1060           1 :                 Start: k.UserKey,
    1061           1 :                 End:   value,
    1062           1 :                 Keys:  []keyspan.Key{{Trailer: k.Trailer}},
    1063           1 :         }.Pretty(w.formatKey)
    1064           1 : }
    1065             : 
    1066           2 : func (w *Writer) addTombstone(key InternalKey, value []byte) error {
    1067           2 :         if !w.disableKeyOrderChecks && !w.rangeDelV1Format && w.rangeDelBlock.nEntries > 0 {
    1068           2 :                 // Check that tombstones are being added in fragmented order. If the two
    1069           2 :                 // tombstones overlap, their start and end keys must be identical.
    1070           2 :                 prevKey := w.rangeDelBlock.getCurKey()
    1071           2 :                 switch c := w.compare(prevKey.UserKey, key.UserKey); {
    1072           0 :                 case c > 0:
    1073           0 :                         w.err = errors.Errorf("pebble: keys must be added in order: %s, %s",
    1074           0 :                                 prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
    1075           0 :                         return w.err
    1076           2 :                 case c == 0:
    1077           2 :                         prevValue := w.rangeDelBlock.curValue
    1078           2 :                         if w.compare(prevValue, value) != 0 {
    1079           1 :                                 w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
    1080           1 :                                         w.prettyTombstone(prevKey, prevValue),
    1081           1 :                                         w.prettyTombstone(key, value))
    1082           1 :                                 return w.err
    1083           1 :                         }
    1084           2 :                         if prevKey.SeqNum() <= key.SeqNum() {
    1085           1 :                                 w.err = errors.Errorf("pebble: keys must be added in strictly increasing order: %s, %s",
    1086           1 :                                         prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
    1087           1 :                                 return w.err
    1088           1 :                         }
    1089           2 :                 default:
    1090           2 :                         prevValue := w.rangeDelBlock.curValue
    1091           2 :                         if w.compare(prevValue, key.UserKey) > 0 {
    1092           1 :                                 w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
    1093           1 :                                         w.prettyTombstone(prevKey, prevValue),
    1094           1 :                                         w.prettyTombstone(key, value))
    1095           1 :                                 return w.err
    1096           1 :                         }
    1097             :                 }
    1098             :         }
    1099             : 
    1100           2 :         if key.Trailer == InternalKeyRangeDeleteSentinel {
    1101           0 :                 w.err = errors.Errorf("pebble: cannot add range delete sentinel: %s", key.Pretty(w.formatKey))
    1102           0 :                 return w.err
    1103           0 :         }
    1104             : 
    1105           2 :         for i := range w.propCollectors {
    1106           1 :                 if err := w.propCollectors[i].Add(key, value); err != nil {
    1107           1 :                         w.err = err
    1108           1 :                         return err
    1109           1 :                 }
    1110             :         }
    1111             : 
    1112           2 :         w.meta.updateSeqNum(key.SeqNum())
    1113           2 : 
    1114           2 :         switch {
    1115           1 :         case w.rangeDelV1Format:
    1116           1 :                 // Range tombstones are not fragmented in the v1 (i.e. RocksDB) range
    1117           1 :                 // deletion block format, so we need to track the largest range tombstone
    1118           1 :                 // end key as every range tombstone is added.
    1119           1 :                 //
    1120           1 :                 // Note that writing the v1 format is only supported for tests.
    1121           1 :                 if w.props.NumRangeDeletions == 0 {
    1122           1 :                         w.meta.SetSmallestRangeDelKey(key.Clone())
    1123           1 :                         w.meta.SetLargestRangeDelKey(base.MakeRangeDeleteSentinelKey(value).Clone())
    1124           1 :                 } else {
    1125           1 :                         if base.InternalCompare(w.compare, w.meta.SmallestRangeDel, key) > 0 {
    1126           1 :                                 w.meta.SetSmallestRangeDelKey(key.Clone())
    1127           1 :                         }
    1128           1 :                         end := base.MakeRangeDeleteSentinelKey(value)
    1129           1 :                         if base.InternalCompare(w.compare, w.meta.LargestRangeDel, end) < 0 {
    1130           1 :                                 w.meta.SetLargestRangeDelKey(end.Clone())
    1131           1 :                         }
    1132             :                 }
    1133             : 
    1134           2 :         default:
    1135           2 :                 // Range tombstones are fragmented in the v2 range deletion block format,
    1136           2 :                 // so the start key of the first range tombstone added will be the smallest
    1137           2 :                 // range tombstone key. The largest range tombstone key will be determined
    1138           2 :                 // in Writer.Close() as the end key of the last range tombstone added.
    1139           2 :                 if w.props.NumRangeDeletions == 0 {
    1140           2 :                         w.meta.SetSmallestRangeDelKey(key.Clone())
    1141           2 :                 }
    1142             :         }
    1143             : 
    1144           2 :         w.props.NumEntries++
    1145           2 :         w.props.NumDeletions++
    1146           2 :         w.props.NumRangeDeletions++
    1147           2 :         w.props.RawKeySize += uint64(key.Size())
    1148           2 :         w.props.RawValueSize += uint64(len(value))
    1149           2 :         w.rangeDelBlock.add(key, value)
    1150           2 :         return nil
    1151             : }
    1152             : 
    1153             : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
    1154             : // the given suffix to the given value. The resulting range key is given the
    1155             : // sequence number zero, with the expectation that the resulting sstable will be
    1156             : // ingested.
    1157             : //
    1158             : // Keys must be added to the table in increasing order of start key. Spans are
    1159             : // not required to be fragmented. The same suffix may not be set or unset twice
    1160             : // over the same keyspan, because it would result in inconsistent state. Both
    1161             : // the Set and Unset would share the zero sequence number, and a key cannot be
    1162             : // both simultaneously set and unset.
    1163           1 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
    1164           1 :         return w.addRangeKeySpan(keyspan.Span{
    1165           1 :                 Start: w.tempRangeKeyCopy(start),
    1166           1 :                 End:   w.tempRangeKeyCopy(end),
    1167           1 :                 Keys: []keyspan.Key{
    1168           1 :                         {
    1169           1 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
    1170           1 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
    1171           1 :                                 Value:   w.tempRangeKeyCopy(value),
    1172           1 :                         },
    1173           1 :                 },
    1174           1 :         })
    1175           1 : }
    1176             : 
    1177             : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
    1178             : // with the given suffix. The resulting range key is given the
    1179             : // sequence number zero, with the expectation that the resulting sstable will be
    1180             : // ingested.
    1181             : //
    1182             : // Keys must be added to the table in increasing order of start key. Spans are
    1183             : // not required to be fragmented. The same suffix may not be set or unset twice
    1184             : // over the same keyspan, because it would result in inconsistent state. Both
    1185             : // the Set and Unset would share the zero sequence number, and a key cannot be
    1186             : // both simultaneously set and unset.
    1187           1 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
    1188           1 :         return w.addRangeKeySpan(keyspan.Span{
    1189           1 :                 Start: w.tempRangeKeyCopy(start),
    1190           1 :                 End:   w.tempRangeKeyCopy(end),
    1191           1 :                 Keys: []keyspan.Key{
    1192           1 :                         {
    1193           1 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
    1194           1 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
    1195           1 :                         },
    1196           1 :                 },
    1197           1 :         })
    1198           1 : }
    1199             : 
    1200             : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
    1201             : //
    1202             : // Keys must be added to the table in increasing order of start key. Spans are
    1203             : // not required to be fragmented.
    1204           2 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
    1205           2 :         return w.addRangeKeySpan(keyspan.Span{
    1206           2 :                 Start: w.tempRangeKeyCopy(start),
    1207           2 :                 End:   w.tempRangeKeyCopy(end),
    1208           2 :                 Keys: []keyspan.Key{
    1209           2 :                         {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
    1210           2 :                 },
    1211           2 :         })
    1212           2 : }
    1213             : 
    1214             : // AddRangeKey adds a range key set, unset, or delete key/value pair to the
    1215             : // table being written.
    1216             : //
    1217             : // Range keys must be supplied in strictly ascending order of start key (i.e.
    1218             : // user key ascending, sequence number descending, and key type descending).
    1219             : // Ranges added must also be supplied in fragmented span order - i.e. other than
    1220             : // spans that are perfectly aligned (same start and end keys), spans may not
    1221             : // overlap. Range keys may be added out of order relative to point keys and
    1222             : // range deletions.
    1223           2 : func (w *Writer) AddRangeKey(key InternalKey, value []byte) error {
    1224           2 :         if w.err != nil {
    1225           0 :                 return w.err
    1226           0 :         }
    1227           2 :         return w.addRangeKey(key, value)
    1228             : }
    1229             : 
    1230           2 : func (w *Writer) addRangeKeySpan(span keyspan.Span) error {
    1231           2 :         if w.compare(span.Start, span.End) >= 0 {
    1232           0 :                 return errors.Errorf(
    1233           0 :                         "pebble: start key must be strictly less than end key",
    1234           0 :                 )
    1235           0 :         }
    1236           2 :         if w.fragmenter.Start() != nil && w.compare(w.fragmenter.Start(), span.Start) > 0 {
    1237           1 :                 return errors.Errorf("pebble: spans must be added in order: %s > %s",
    1238           1 :                         w.formatKey(w.fragmenter.Start()), w.formatKey(span.Start))
    1239           1 :         }
    1240             :         // Add this span to the fragmenter.
    1241           2 :         w.fragmenter.Add(span)
    1242           2 :         return w.err
    1243             : }
    1244             : 
    1245           2 : func (w *Writer) encodeRangeKeySpan(span keyspan.Span) {
    1246           2 :         // This method is the emit function of the Fragmenter.
    1247           2 :         //
    1248           2 :         // NB: The span should only contain range keys and be internally consistent
    1249           2 :         // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
    1250           2 :         //
    1251           2 :         // We use w.rangeKeysBySuffix and w.rangeKeySpan to avoid allocations.
    1252           2 : 
    1253           2 :         // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
    1254           2 :         // we may want to in the future.
    1255           2 :         w.rangeKeysBySuffix.Cmp = w.compare
    1256           2 :         w.rangeKeysBySuffix.Keys = span.Keys
    1257           2 :         sort.Sort(&w.rangeKeysBySuffix)
    1258           2 : 
    1259           2 :         w.rangeKeySpan = span
    1260           2 :         w.rangeKeySpan.Keys = w.rangeKeysBySuffix.Keys
    1261           2 :         w.err = firstError(w.err, w.rangeKeyEncoder.Encode(&w.rangeKeySpan))
    1262           2 : }
    1263             : 
    1264           2 : func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
    1265           2 :         if !w.disableKeyOrderChecks && w.rangeKeyBlock.nEntries > 0 {
    1266           2 :                 prevStartKey := w.rangeKeyBlock.getCurKey()
    1267           2 :                 prevEndKey, _, ok := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue)
    1268           2 :                 if !ok {
    1269           0 :                         // We panic here as we should have previously decoded and validated this
    1270           0 :                         // key and value when it was first added to the range key block.
    1271           0 :                         panic(errors.Errorf("pebble: invalid end key for span: %s",
    1272           0 :                                 prevStartKey.Pretty(w.formatKey)))
    1273             :                 }
    1274             : 
    1275           2 :                 curStartKey := key
    1276           2 :                 curEndKey, _, ok := rangekey.DecodeEndKey(curStartKey.Kind(), value)
    1277           2 :                 if !ok {
    1278           0 :                         w.err = errors.Errorf("pebble: invalid end key for span: %s",
    1279           0 :                                 curStartKey.Pretty(w.formatKey))
    1280           0 :                         return w.err
    1281           0 :                 }
    1282             : 
    1283             :                 // Start keys must be strictly increasing.
    1284           2 :                 if base.InternalCompare(w.compare, prevStartKey, curStartKey) >= 0 {
    1285           1 :                         w.err = errors.Errorf(
    1286           1 :                                 "pebble: range keys starts must be added in increasing order: %s, %s",
    1287           1 :                                 prevStartKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
    1288           1 :                         return w.err
    1289           1 :                 }
    1290             : 
    1291             :                 // Start keys are increasing. If the start user keys are equal, the
    1292             :                 // end keys must be equal (i.e. aligned spans).
    1293           2 :                 if w.compare(prevStartKey.UserKey, curStartKey.UserKey) == 0 {
    1294           2 :                         if w.compare(prevEndKey, curEndKey) != 0 {
    1295           1 :                                 w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
    1296           1 :                                         prevStartKey.Pretty(w.formatKey),
    1297           1 :                                         curStartKey.Pretty(w.formatKey))
    1298           1 :                                 return w.err
    1299           1 :                         }
    1300           2 :                 } else if w.compare(prevEndKey, curStartKey.UserKey) > 0 {
    1301           1 :                         // If the start user keys are NOT equal, the spans must be disjoint (i.e.
    1302           1 :                         // no overlap).
    1303           1 :                         // NOTE: the inequality excludes zero, as we allow the end key of the
    1304           1 :                         // lower span be the same as the start key of the upper span, because
    1305           1 :                         // the range end key is considered an exclusive bound.
    1306           1 :                         w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
    1307           1 :                                 prevStartKey.Pretty(w.formatKey),
    1308           1 :                                 curStartKey.Pretty(w.formatKey))
    1309           1 :                         return w.err
    1310           1 :                 }
    1311             :         }
    1312             : 
    1313             :         // TODO(travers): Add an invariant-gated check to ensure that suffix-values
    1314             :         // are sorted within coalesced spans.
    1315             : 
    1316             :         // Range-keys and point-keys are intended to live in "parallel" keyspaces.
    1317             :         // However, we track a single seqnum in the table metadata that spans both of
    1318             :         // these keyspaces.
    1319             :         // TODO(travers): Consider tracking range key seqnums separately.
    1320           2 :         w.meta.updateSeqNum(key.SeqNum())
    1321           2 : 
    1322           2 :         // Range tombstones are fragmented, so the start key of the first range key
    1323           2 :         // added will be the smallest. The largest range key is determined in
    1324           2 :         // Writer.Close() as the end key of the last range key added to the block.
    1325           2 :         if w.props.NumRangeKeys() == 0 {
    1326           2 :                 w.meta.SetSmallestRangeKey(key.Clone())
    1327           2 :         }
    1328             : 
    1329             :         // Update block properties.
    1330           2 :         w.props.RawRangeKeyKeySize += uint64(key.Size())
    1331           2 :         w.props.RawRangeKeyValueSize += uint64(len(value))
    1332           2 :         switch key.Kind() {
    1333           2 :         case base.InternalKeyKindRangeKeyDelete:
    1334           2 :                 w.props.NumRangeKeyDels++
    1335           2 :         case base.InternalKeyKindRangeKeySet:
    1336           2 :                 w.props.NumRangeKeySets++
    1337           2 :         case base.InternalKeyKindRangeKeyUnset:
    1338           2 :                 w.props.NumRangeKeyUnsets++
    1339           0 :         default:
    1340           0 :                 panic(errors.Errorf("pebble: invalid range key type: %s", key.Kind()))
    1341             :         }
    1342             : 
    1343           2 :         for i := range w.blockPropCollectors {
    1344           2 :                 if err := w.blockPropCollectors[i].Add(key, value); err != nil {
    1345           0 :                         return err
    1346           0 :                 }
    1347             :         }
    1348             : 
    1349             :         // Add the key to the block.
    1350           2 :         w.rangeKeyBlock.add(key, value)
    1351           2 :         return nil
    1352             : }
    1353             : 
    1354             : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
    1355             : // slice. Any byte written to the returned slice is retained for the lifetime of
    1356             : // the Writer.
    1357           2 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
    1358           2 :         if cap(w.rkBuf)-len(w.rkBuf) < n {
    1359           2 :                 size := len(w.rkBuf) + 2*n
    1360           2 :                 if size < 2*cap(w.rkBuf) {
    1361           2 :                         size = 2 * cap(w.rkBuf)
    1362           2 :                 }
    1363           2 :                 buf := make([]byte, len(w.rkBuf), size)
    1364           2 :                 copy(buf, w.rkBuf)
    1365           2 :                 w.rkBuf = buf
    1366             :         }
    1367           2 :         b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
    1368           2 :         w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
    1369           2 :         return b
    1370             : }
    1371             : 
    1372             : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
    1373             : // range key buffer.
    1374           2 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
    1375           2 :         if len(k) == 0 {
    1376           1 :                 return nil
    1377           1 :         }
    1378           2 :         buf := w.tempRangeKeyBuf(len(k))
    1379           2 :         copy(buf, k)
    1380           2 :         return buf
    1381             : }
    1382             : 
    1383           2 : func (w *Writer) maybeAddToFilter(key []byte) {
    1384           2 :         if w.filter != nil {
    1385           2 :                 if w.split != nil {
    1386           2 :                         prefix := key[:w.split(key)]
    1387           2 :                         w.filter.addKey(prefix)
    1388           2 :                 } else {
    1389           1 :                         w.filter.addKey(key)
    1390           1 :                 }
    1391             :         }
    1392             : }
    1393             : 
    1394           2 : func (w *Writer) flush(key InternalKey) error {
    1395           2 :         // We're finishing a data block.
    1396           2 :         err := w.finishDataBlockProps(w.dataBlockBuf)
    1397           2 :         if err != nil {
    1398           1 :                 return err
    1399           1 :         }
    1400           2 :         w.dataBlockBuf.finish()
    1401           2 :         w.dataBlockBuf.compressAndChecksum(w.compression)
    1402           2 :         // Since dataBlockEstimates.addInflightDataBlock was never called, the
    1403           2 :         // inflightSize is set to 0.
    1404           2 :         w.coordination.sizeEstimate.dataBlockCompressed(len(w.dataBlockBuf.compressed), 0)
    1405           2 : 
    1406           2 :         // Determine if the index block should be flushed. Since we're accessing the
    1407           2 :         // dataBlockBuf.dataBlock.curKey here, we have to make sure that once we start
    1408           2 :         // to pool the dataBlockBufs, the curKey isn't used by the Writer once the
    1409           2 :         // dataBlockBuf is added back to a sync.Pool. In this particular case, the
    1410           2 :         // byte slice which supports "sep" will eventually be copied when "sep" is
    1411           2 :         // added to the index block.
    1412           2 :         prevKey := w.dataBlockBuf.dataBlock.getCurKey()
    1413           2 :         sep := w.indexEntrySep(prevKey, key, w.dataBlockBuf)
    1414           2 :         // We determine that we should flush an index block from the Writer client
    1415           2 :         // goroutine, but we actually finish the index block from the writeQueue.
    1416           2 :         // When we determine that an index block should be flushed, we need to call
    1417           2 :         // BlockPropertyCollector.FinishIndexBlock. But block property collector
    1418           2 :         // calls must happen sequentially from the Writer client. Therefore, we need
    1419           2 :         // to determine that we are going to flush the index block from the Writer
    1420           2 :         // client.
    1421           2 :         shouldFlushIndexBlock := supportsTwoLevelIndex(w.tableFormat) && w.indexBlock.shouldFlush(
    1422           2 :                 sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold,
    1423           2 :         )
    1424           2 : 
    1425           2 :         var indexProps []byte
    1426           2 :         var flushableIndexBlock *indexBlockBuf
    1427           2 :         if shouldFlushIndexBlock {
    1428           2 :                 flushableIndexBlock = w.indexBlock
    1429           2 :                 w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
    1430           2 :                 // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
    1431           2 :                 // flush the index block.
    1432           2 :                 indexProps, err = w.finishIndexBlockProps()
    1433           2 :                 if err != nil {
    1434           1 :                         return err
    1435           1 :                 }
    1436             :         }
    1437             : 
    1438             :         // We've called BlockPropertyCollector.FinishDataBlock, and, if necessary,
    1439             :         // BlockPropertyCollector.FinishIndexBlock. Since we've decided to finish
    1440             :         // the data block, we can call
    1441             :         // BlockPropertyCollector.AddPrevDataBlockToIndexBlock.
    1442           2 :         w.addPrevDataBlockToIndexBlockProps()
    1443           2 : 
    1444           2 :         // Schedule a write.
    1445           2 :         writeTask := writeTaskPool.Get().(*writeTask)
    1446           2 :         // We're setting compressionDone to indicate that compression of this block
    1447           2 :         // has already been completed.
    1448           2 :         writeTask.compressionDone <- true
    1449           2 :         writeTask.buf = w.dataBlockBuf
    1450           2 :         writeTask.indexEntrySep = sep
    1451           2 :         writeTask.currIndexBlock = w.indexBlock
    1452           2 :         writeTask.indexInflightSize = sep.Size() + encodedBHPEstimatedSize
    1453           2 :         writeTask.finishedIndexProps = indexProps
    1454           2 :         writeTask.flushableIndexBlock = flushableIndexBlock
    1455           2 : 
    1456           2 :         // The writeTask corresponds to an unwritten index entry.
    1457           2 :         w.indexBlock.addInflight(writeTask.indexInflightSize)
    1458           2 : 
    1459           2 :         w.dataBlockBuf = nil
    1460           2 :         if w.coordination.parallelismEnabled {
    1461           2 :                 w.coordination.writeQueue.add(writeTask)
    1462           2 :         } else {
    1463           2 :                 err = w.coordination.writeQueue.addSync(writeTask)
    1464           2 :         }
    1465           2 :         w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
    1466           2 : 
    1467           2 :         return err
    1468             : }
    1469             : 
    1470           2 : func (w *Writer) maybeFlush(key InternalKey, valueLen int) error {
    1471           2 :         if !w.dataBlockBuf.shouldFlush(key, valueLen, w.blockSize, w.blockSizeThreshold) {
    1472           2 :                 return nil
    1473           2 :         }
    1474             : 
    1475           2 :         err := w.flush(key)
    1476           2 : 
    1477           2 :         if err != nil {
    1478           1 :                 w.err = err
    1479           1 :                 return err
    1480           1 :         }
    1481             : 
    1482           2 :         return nil
    1483             : }
    1484             : 
    1485             : // dataBlockBuf.dataBlockProps set by this method must be encoded before any future use of the
    1486             : // dataBlockBuf.blockPropsEncoder, since the properties slice will get reused by the
    1487             : // blockPropsEncoder.
    1488           2 : func (w *Writer) finishDataBlockProps(buf *dataBlockBuf) error {
    1489           2 :         if len(w.blockPropCollectors) == 0 {
    1490           2 :                 return nil
    1491           2 :         }
    1492           2 :         var err error
    1493           2 :         buf.blockPropsEncoder.resetProps()
    1494           2 :         for i := range w.blockPropCollectors {
    1495           2 :                 scratch := buf.blockPropsEncoder.getScratchForProp()
    1496           2 :                 if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
    1497           1 :                         return err
    1498           1 :                 }
    1499           2 :                 if len(scratch) > 0 {
    1500           2 :                         buf.blockPropsEncoder.addProp(shortID(i), scratch)
    1501           2 :                 }
    1502             :         }
    1503             : 
    1504           2 :         buf.dataBlockProps = buf.blockPropsEncoder.unsafeProps()
    1505           2 :         return nil
    1506             : }
    1507             : 
    1508             : // The BlockHandleWithProperties returned by this method must be encoded before any future use of
    1509             : // the Writer.blockPropsEncoder, since the properties slice will get reused by the blockPropsEncoder.
    1510             : // maybeAddBlockPropertiesToBlockHandle should only be called if block is being written synchronously
    1511             : // with the Writer client.
    1512             : func (w *Writer) maybeAddBlockPropertiesToBlockHandle(
    1513             :         bh BlockHandle,
    1514           2 : ) (BlockHandleWithProperties, error) {
    1515           2 :         err := w.finishDataBlockProps(w.dataBlockBuf)
    1516           2 :         if err != nil {
    1517           0 :                 return BlockHandleWithProperties{}, err
    1518           0 :         }
    1519           2 :         return BlockHandleWithProperties{BlockHandle: bh, Props: w.dataBlockBuf.dataBlockProps}, nil
    1520             : }
    1521             : 
    1522           2 : func (w *Writer) indexEntrySep(prevKey, key InternalKey, dataBlockBuf *dataBlockBuf) InternalKey {
    1523           2 :         // Make a rough guess that we want key-sized scratch to compute the separator.
    1524           2 :         if cap(dataBlockBuf.sepScratch) < key.Size() {
    1525           2 :                 dataBlockBuf.sepScratch = make([]byte, 0, key.Size()*2)
    1526           2 :         }
    1527             : 
    1528           2 :         var sep InternalKey
    1529           2 :         if key.UserKey == nil && key.Trailer == 0 {
    1530           2 :                 sep = prevKey.Successor(w.compare, w.successor, dataBlockBuf.sepScratch[:0])
    1531           2 :         } else {
    1532           2 :                 sep = prevKey.Separator(w.compare, w.separator, dataBlockBuf.sepScratch[:0], key)
    1533           2 :         }
    1534           2 :         return sep
    1535             : }
    1536             : 
    1537             : // addIndexEntry adds an index entry for the specified key and block handle.
    1538             : // addIndexEntry can be called from both the Writer client goroutine, and the
    1539             : // writeQueue goroutine. If the flushIndexBuf != nil, then the indexProps, as
    1540             : // they're used when the index block is finished.
    1541             : //
    1542             : // Invariant:
    1543             : //  1. addIndexEntry must not store references to the sep InternalKey, the tmp
    1544             : //     byte slice, bhp.Props. That is, these must be either deep copied or
    1545             : //     encoded.
    1546             : //  2. addIndexEntry must not hold references to the flushIndexBuf, and the writeTo
    1547             : //     indexBlockBufs.
    1548             : func (w *Writer) addIndexEntry(
    1549             :         sep InternalKey,
    1550             :         bhp BlockHandleWithProperties,
    1551             :         tmp []byte,
    1552             :         flushIndexBuf *indexBlockBuf,
    1553             :         writeTo *indexBlockBuf,
    1554             :         inflightSize int,
    1555             :         indexProps []byte,
    1556           2 : ) error {
    1557           2 :         if bhp.Length == 0 {
    1558           0 :                 // A valid blockHandle must be non-zero.
    1559           0 :                 // In particular, it must have a non-zero length.
    1560           0 :                 return nil
    1561           0 :         }
    1562             : 
    1563           2 :         encoded := encodeBlockHandleWithProperties(tmp, bhp)
    1564           2 : 
    1565           2 :         if flushIndexBuf != nil {
    1566           2 :                 if cap(w.indexPartitions) == 0 {
    1567           2 :                         w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
    1568           2 :                 }
    1569             :                 // Enable two level indexes if there is more than one index block.
    1570           2 :                 w.twoLevelIndex = true
    1571           2 :                 if err := w.finishIndexBlock(flushIndexBuf, indexProps); err != nil {
    1572           0 :                         return err
    1573           0 :                 }
    1574             :         }
    1575             : 
    1576           2 :         writeTo.add(sep, encoded, inflightSize)
    1577           2 :         return nil
    1578             : }
    1579             : 
    1580           2 : func (w *Writer) addPrevDataBlockToIndexBlockProps() {
    1581           2 :         for i := range w.blockPropCollectors {
    1582           2 :                 w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
    1583           2 :         }
    1584             : }
    1585             : 
    1586             : // addIndexEntrySync adds an index entry for the specified key and block handle.
    1587             : // Writer.addIndexEntry is only called synchronously once Writer.Close is called.
    1588             : // addIndexEntrySync should only be called if we're sure that index entries
    1589             : // aren't being written asynchronously.
    1590             : //
    1591             : // Invariant:
    1592             : //  1. addIndexEntrySync must not store references to the prevKey, key InternalKey's,
    1593             : //     the tmp byte slice. That is, these must be either deep copied or encoded.
    1594             : func (w *Writer) addIndexEntrySync(
    1595             :         prevKey, key InternalKey, bhp BlockHandleWithProperties, tmp []byte,
    1596           2 : ) error {
    1597           2 :         sep := w.indexEntrySep(prevKey, key, w.dataBlockBuf)
    1598           2 :         shouldFlush := supportsTwoLevelIndex(
    1599           2 :                 w.tableFormat) && w.indexBlock.shouldFlush(
    1600           2 :                 sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold,
    1601           2 :         )
    1602           2 :         var flushableIndexBlock *indexBlockBuf
    1603           2 :         var props []byte
    1604           2 :         var err error
    1605           2 :         if shouldFlush {
    1606           2 :                 flushableIndexBlock = w.indexBlock
    1607           2 :                 w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
    1608           2 : 
    1609           2 :                 // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
    1610           2 :                 // flush the index block.
    1611           2 :                 props, err = w.finishIndexBlockProps()
    1612           2 :                 if err != nil {
    1613           0 :                         return err
    1614           0 :                 }
    1615             :         }
    1616             : 
    1617           2 :         err = w.addIndexEntry(sep, bhp, tmp, flushableIndexBlock, w.indexBlock, 0, props)
    1618           2 :         if flushableIndexBlock != nil {
    1619           2 :                 flushableIndexBlock.clear()
    1620           2 :                 indexBlockBufPool.Put(flushableIndexBlock)
    1621           2 :         }
    1622           2 :         w.addPrevDataBlockToIndexBlockProps()
    1623           2 :         return err
    1624             : }
    1625             : 
    1626             : func shouldFlush(
    1627             :         key InternalKey,
    1628             :         valueLen int,
    1629             :         restartInterval, estimatedBlockSize, numEntries, targetBlockSize, sizeThreshold int,
    1630           2 : ) bool {
    1631           2 :         if numEntries == 0 {
    1632           2 :                 return false
    1633           2 :         }
    1634             : 
    1635           2 :         if estimatedBlockSize >= targetBlockSize {
    1636           2 :                 return true
    1637           2 :         }
    1638             : 
    1639             :         // The block is currently smaller than the target size.
    1640           2 :         if estimatedBlockSize <= sizeThreshold {
    1641           2 :                 // The block is smaller than the threshold size at which we'll consider
    1642           2 :                 // flushing it.
    1643           2 :                 return false
    1644           2 :         }
    1645             : 
    1646           2 :         newSize := estimatedBlockSize + key.Size() + valueLen
    1647           2 :         if numEntries%restartInterval == 0 {
    1648           2 :                 newSize += 4
    1649           2 :         }
    1650           2 :         newSize += 4                              // varint for shared prefix length
    1651           2 :         newSize += uvarintLen(uint32(key.Size())) // varint for unshared key bytes
    1652           2 :         newSize += uvarintLen(uint32(valueLen))   // varint for value size
    1653           2 :         // Flush if the block plus the new entry is larger than the target size.
    1654           2 :         return newSize > targetBlockSize
    1655             : }
    1656             : 
    1657           2 : func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) {
    1658           2 :         if len(k.UserKey) == 0 {
    1659           0 :                 return a, k
    1660           0 :         }
    1661           2 :         a, keyCopy := a.Copy(k.UserKey)
    1662           2 :         return a, InternalKey{UserKey: keyCopy, Trailer: k.Trailer}
    1663             : }
    1664             : 
    1665             : // Invariants: The byte slice returned by finishIndexBlockProps is heap-allocated
    1666             : //
    1667             : //      and has its own lifetime, independent of the Writer and the blockPropsEncoder,
    1668             : //
    1669             : // and it is safe to:
    1670             : //  1. Reuse w.blockPropsEncoder without first encoding the byte slice returned.
    1671             : //  2. Store the byte slice in the Writer since it is a copy and not supported by
    1672             : //     an underlying buffer.
    1673           2 : func (w *Writer) finishIndexBlockProps() ([]byte, error) {
    1674           2 :         w.blockPropsEncoder.resetProps()
    1675           2 :         for i := range w.blockPropCollectors {
    1676           2 :                 scratch := w.blockPropsEncoder.getScratchForProp()
    1677           2 :                 var err error
    1678           2 :                 if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
    1679           1 :                         return nil, err
    1680           1 :                 }
    1681           2 :                 if len(scratch) > 0 {
    1682           2 :                         w.blockPropsEncoder.addProp(shortID(i), scratch)
    1683           2 :                 }
    1684             :         }
    1685           2 :         return w.blockPropsEncoder.props(), nil
    1686             : }
    1687             : 
    1688             : // finishIndexBlock finishes the current index block and adds it to the top
    1689             : // level index block. This is only used when two level indexes are enabled.
    1690             : //
    1691             : // Invariants:
    1692             : //  1. The props slice passed into finishedIndexBlock must not be a
    1693             : //     owned by any other struct, since it will be stored in the Writer.indexPartitions
    1694             : //     slice.
    1695             : //  2. None of the buffers owned by indexBuf will be shallow copied and stored elsewhere.
    1696             : //     That is, it must be safe to reuse indexBuf after finishIndexBlock has been called.
    1697           2 : func (w *Writer) finishIndexBlock(indexBuf *indexBlockBuf, props []byte) error {
    1698           2 :         part := indexBlockAndBlockProperties{
    1699           2 :                 nEntries: indexBuf.block.nEntries, properties: props,
    1700           2 :         }
    1701           2 :         w.indexSepAlloc, part.sep = cloneKeyWithBuf(
    1702           2 :                 indexBuf.block.getCurKey(), w.indexSepAlloc,
    1703           2 :         )
    1704           2 :         bk := indexBuf.finish()
    1705           2 :         if len(w.indexBlockAlloc) < len(bk) {
    1706           2 :                 // Allocate enough bytes for approximately 16 index blocks.
    1707           2 :                 w.indexBlockAlloc = make([]byte, len(bk)*16)
    1708           2 :         }
    1709           2 :         n := copy(w.indexBlockAlloc, bk)
    1710           2 :         part.block = w.indexBlockAlloc[:n:n]
    1711           2 :         w.indexBlockAlloc = w.indexBlockAlloc[n:]
    1712           2 :         w.indexPartitions = append(w.indexPartitions, part)
    1713           2 :         return nil
    1714             : }
    1715             : 
    1716           2 : func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {
    1717           2 :         props, err := w.finishIndexBlockProps()
    1718           2 :         if err != nil {
    1719           0 :                 return BlockHandle{}, err
    1720           0 :         }
    1721             :         // Add the final unfinished index.
    1722           2 :         if err = w.finishIndexBlock(w.indexBlock, props); err != nil {
    1723           0 :                 return BlockHandle{}, err
    1724           0 :         }
    1725             : 
    1726           2 :         for i := range w.indexPartitions {
    1727           2 :                 b := &w.indexPartitions[i]
    1728           2 :                 w.props.NumDataBlocks += uint64(b.nEntries)
    1729           2 : 
    1730           2 :                 data := b.block
    1731           2 :                 w.props.IndexSize += uint64(len(data))
    1732           2 :                 bh, err := w.writeBlock(data, w.compression, &w.blockBuf)
    1733           2 :                 if err != nil {
    1734           0 :                         return BlockHandle{}, err
    1735           0 :                 }
    1736           2 :                 bhp := BlockHandleWithProperties{
    1737           2 :                         BlockHandle: bh,
    1738           2 :                         Props:       b.properties,
    1739           2 :                 }
    1740           2 :                 encoded := encodeBlockHandleWithProperties(w.blockBuf.tmp[:], bhp)
    1741           2 :                 w.topLevelIndexBlock.add(b.sep, encoded)
    1742             :         }
    1743             : 
    1744             :         // NB: RocksDB includes the block trailer length in the index size
    1745             :         // property, though it doesn't include the trailer in the top level
    1746             :         // index size property.
    1747           2 :         w.props.IndexPartitions = uint64(len(w.indexPartitions))
    1748           2 :         w.props.TopLevelIndexSize = uint64(w.topLevelIndexBlock.estimatedSize())
    1749           2 :         w.props.IndexSize += w.props.TopLevelIndexSize + blockTrailerLen
    1750           2 : 
    1751           2 :         return w.writeBlock(w.topLevelIndexBlock.finish(), w.compression, &w.blockBuf)
    1752             : }
    1753             : 
    1754           2 : func compressAndChecksum(b []byte, compression Compression, blockBuf *blockBuf) []byte {
    1755           2 :         // Compress the buffer, discarding the result if the improvement isn't at
    1756           2 :         // least 12.5%.
    1757           2 :         blockType, compressed := compressBlock(compression, b, blockBuf.compressedBuf)
    1758           2 :         if blockType != noCompressionBlockType && cap(compressed) > cap(blockBuf.compressedBuf) {
    1759           2 :                 blockBuf.compressedBuf = compressed[:cap(compressed)]
    1760           2 :         }
    1761           2 :         if len(compressed) < len(b)-len(b)/8 {
    1762           2 :                 b = compressed
    1763           2 :         } else {
    1764           2 :                 blockType = noCompressionBlockType
    1765           2 :         }
    1766             : 
    1767           2 :         blockBuf.tmp[0] = byte(blockType)
    1768           2 : 
    1769           2 :         // Calculate the checksum.
    1770           2 :         checksum := blockBuf.checksummer.checksum(b, blockBuf.tmp[:1])
    1771           2 :         binary.LittleEndian.PutUint32(blockBuf.tmp[1:5], checksum)
    1772           2 :         return b
    1773             : }
    1774             : 
    1775           2 : func (w *Writer) writeCompressedBlock(block []byte, blockTrailerBuf []byte) (BlockHandle, error) {
    1776           2 :         bh := BlockHandle{Offset: w.meta.Size, Length: uint64(len(block))}
    1777           2 : 
    1778           2 :         if w.cacheID != 0 && w.fileNum.FileNum() != 0 {
    1779           2 :                 // Remove the block being written from the cache. This provides defense in
    1780           2 :                 // depth against bugs which cause cache collisions.
    1781           2 :                 //
    1782           2 :                 // TODO(peter): Alternatively, we could add the uncompressed value to the
    1783           2 :                 // cache.
    1784           2 :                 w.cache.Delete(w.cacheID, w.fileNum, bh.Offset)
    1785           2 :         }
    1786             : 
    1787             :         // Write the bytes to the file.
    1788           2 :         if err := w.writable.Write(block); err != nil {
    1789           0 :                 return BlockHandle{}, err
    1790           0 :         }
    1791           2 :         w.meta.Size += uint64(len(block))
    1792           2 :         if err := w.writable.Write(blockTrailerBuf[:blockTrailerLen]); err != nil {
    1793           0 :                 return BlockHandle{}, err
    1794           0 :         }
    1795           2 :         w.meta.Size += blockTrailerLen
    1796           2 : 
    1797           2 :         return bh, nil
    1798             : }
    1799             : 
    1800             : // Write implements io.Writer. This is analogous to writeCompressedBlock for
    1801             : // blocks that already incorporate the trailer, and don't need the callee to
    1802             : // return a BlockHandle.
    1803           2 : func (w *Writer) Write(blockWithTrailer []byte) (n int, err error) {
    1804           2 :         offset := w.meta.Size
    1805           2 :         if w.cacheID != 0 && w.fileNum.FileNum() != 0 {
    1806           2 :                 // Remove the block being written from the cache. This provides defense in
    1807           2 :                 // depth against bugs which cause cache collisions.
    1808           2 :                 //
    1809           2 :                 // TODO(peter): Alternatively, we could add the uncompressed value to the
    1810           2 :                 // cache.
    1811           2 :                 w.cache.Delete(w.cacheID, w.fileNum, offset)
    1812           2 :         }
    1813           2 :         w.meta.Size += uint64(len(blockWithTrailer))
    1814           2 :         if err := w.writable.Write(blockWithTrailer); err != nil {
    1815           0 :                 return 0, err
    1816           0 :         }
    1817           2 :         return len(blockWithTrailer), nil
    1818             : }
    1819             : 
    1820             : func (w *Writer) writeBlock(
    1821             :         b []byte, compression Compression, blockBuf *blockBuf,
    1822           2 : ) (BlockHandle, error) {
    1823           2 :         b = compressAndChecksum(b, compression, blockBuf)
    1824           2 :         return w.writeCompressedBlock(b, blockBuf.tmp[:])
    1825           2 : }
    1826             : 
    1827             : // assertFormatCompatibility ensures that the features present on the table are
    1828             : // compatible with the table format version.
    1829           2 : func (w *Writer) assertFormatCompatibility() error {
    1830           2 :         // PebbleDBv1: block properties.
    1831           2 :         if len(w.blockPropCollectors) > 0 && w.tableFormat < TableFormatPebblev1 {
    1832           1 :                 return errors.Newf(
    1833           1 :                         "table format version %s is less than the minimum required version %s for block properties",
    1834           1 :                         w.tableFormat, TableFormatPebblev1,
    1835           1 :                 )
    1836           1 :         }
    1837             : 
    1838             :         // PebbleDBv2: range keys.
    1839           2 :         if w.props.NumRangeKeys() > 0 && w.tableFormat < TableFormatPebblev2 {
    1840           1 :                 return errors.Newf(
    1841           1 :                         "table format version %s is less than the minimum required version %s for range keys",
    1842           1 :                         w.tableFormat, TableFormatPebblev2,
    1843           1 :                 )
    1844           1 :         }
    1845             : 
    1846             :         // PebbleDBv3: value blocks.
    1847           2 :         if (w.props.NumValueBlocks > 0 || w.props.NumValuesInValueBlocks > 0 ||
    1848           2 :                 w.props.ValueBlocksSize > 0) && w.tableFormat < TableFormatPebblev3 {
    1849           0 :                 return errors.Newf(
    1850           0 :                         "table format version %s is less than the minimum required version %s for value blocks",
    1851           0 :                         w.tableFormat, TableFormatPebblev3)
    1852           0 :         }
    1853             : 
    1854             :         // PebbleDBv4: DELSIZED tombstones.
    1855           2 :         if w.props.NumSizedDeletions > 0 && w.tableFormat < TableFormatPebblev4 {
    1856           0 :                 return errors.Newf(
    1857           0 :                         "table format version %s is less than the minimum required version %s for sized deletion tombstones",
    1858           0 :                         w.tableFormat, TableFormatPebblev4)
    1859           0 :         }
    1860           2 :         return nil
    1861             : }
    1862             : 
    1863             : // Close finishes writing the table and closes the underlying file that the
    1864             : // table was written to.
    1865           2 : func (w *Writer) Close() (err error) {
    1866           2 :         defer func() {
    1867           2 :                 if w.valueBlockWriter != nil {
    1868           2 :                         releaseValueBlockWriter(w.valueBlockWriter)
    1869           2 :                         // Defensive code in case Close gets called again. We don't want to put
    1870           2 :                         // the same object to a sync.Pool.
    1871           2 :                         w.valueBlockWriter = nil
    1872           2 :                 }
    1873           2 :                 if w.writable != nil {
    1874           1 :                         w.writable.Abort()
    1875           1 :                         w.writable = nil
    1876           1 :                 }
    1877             :                 // Record any error in the writer (so we can exit early if Close is called
    1878             :                 // again).
    1879           2 :                 if err != nil {
    1880           1 :                         w.err = err
    1881           1 :                 }
    1882             :         }()
    1883             : 
    1884             :         // finish must be called before we check for an error, because finish will
    1885             :         // block until every single task added to the writeQueue has been processed,
    1886             :         // and an error could be encountered while any of those tasks are processed.
    1887           2 :         if err := w.coordination.writeQueue.finish(); err != nil {
    1888           1 :                 return err
    1889           1 :         }
    1890             : 
    1891           2 :         if w.err != nil {
    1892           1 :                 return w.err
    1893           1 :         }
    1894             : 
    1895             :         // The w.meta.LargestPointKey is only used once the Writer is closed, so it is safe to set it
    1896             :         // when the Writer is closed.
    1897             :         //
    1898             :         // The following invariants ensure that setting the largest key at this point of a Writer close
    1899             :         // is correct:
    1900             :         // 1. Keys must only be added to the Writer in an increasing order.
    1901             :         // 2. The current w.dataBlockBuf is guaranteed to have the latest key added to the Writer. This
    1902             :         //    must be true, because a w.dataBlockBuf is only switched out when a dataBlock is flushed,
    1903             :         //    however, if a dataBlock is flushed, then we add a key to the new w.dataBlockBuf in the
    1904             :         //    addPoint function after the flush occurs.
    1905           2 :         if w.dataBlockBuf.dataBlock.nEntries >= 1 {
    1906           2 :                 w.meta.SetLargestPointKey(w.dataBlockBuf.dataBlock.getCurKey().Clone())
    1907           2 :         }
    1908             : 
    1909             :         // Finish the last data block, or force an empty data block if there
    1910             :         // aren't any data blocks at all.
    1911           2 :         if w.dataBlockBuf.dataBlock.nEntries > 0 || w.indexBlock.block.nEntries == 0 {
    1912           2 :                 bh, err := w.writeBlock(w.dataBlockBuf.dataBlock.finish(), w.compression, &w.dataBlockBuf.blockBuf)
    1913           2 :                 if err != nil {
    1914           0 :                         return err
    1915           0 :                 }
    1916           2 :                 bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
    1917           2 :                 if err != nil {
    1918           0 :                         return err
    1919           0 :                 }
    1920           2 :                 prevKey := w.dataBlockBuf.dataBlock.getCurKey()
    1921           2 :                 if err := w.addIndexEntrySync(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil {
    1922           0 :                         return err
    1923           0 :                 }
    1924             :         }
    1925           2 :         w.props.DataSize = w.meta.Size
    1926           2 : 
    1927           2 :         // Write the filter block.
    1928           2 :         var metaindex rawBlockWriter
    1929           2 :         metaindex.restartInterval = 1
    1930           2 :         if w.filter != nil {
    1931           2 :                 b, err := w.filter.finish()
    1932           2 :                 if err != nil {
    1933           0 :                         return err
    1934           0 :                 }
    1935           2 :                 bh, err := w.writeBlock(b, NoCompression, &w.blockBuf)
    1936           2 :                 if err != nil {
    1937           0 :                         return err
    1938           0 :                 }
    1939           2 :                 n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
    1940           2 :                 metaindex.add(InternalKey{UserKey: []byte(w.filter.metaName())}, w.blockBuf.tmp[:n])
    1941           2 :                 w.props.FilterPolicyName = w.filter.policyName()
    1942           2 :                 w.props.FilterSize = bh.Length
    1943             :         }
    1944             : 
    1945           2 :         var indexBH BlockHandle
    1946           2 :         if w.twoLevelIndex {
    1947           2 :                 w.props.IndexType = twoLevelIndex
    1948           2 :                 // Write the two level index block.
    1949           2 :                 indexBH, err = w.writeTwoLevelIndex()
    1950           2 :                 if err != nil {
    1951           0 :                         return err
    1952           0 :                 }
    1953           2 :         } else {
    1954           2 :                 w.props.IndexType = binarySearchIndex
    1955           2 :                 // NB: RocksDB includes the block trailer length in the index size
    1956           2 :                 // property, though it doesn't include the trailer in the filter size
    1957           2 :                 // property.
    1958           2 :                 w.props.IndexSize = uint64(w.indexBlock.estimatedSize()) + blockTrailerLen
    1959           2 :                 w.props.NumDataBlocks = uint64(w.indexBlock.block.nEntries)
    1960           2 : 
    1961           2 :                 // Write the single level index block.
    1962           2 :                 indexBH, err = w.writeBlock(w.indexBlock.finish(), w.compression, &w.blockBuf)
    1963           2 :                 if err != nil {
    1964           0 :                         return err
    1965           0 :                 }
    1966             :         }
    1967             : 
    1968             :         // Write the range-del block. The block handle must added to the meta index block
    1969             :         // after the properties block has been written. This is because the entries in the
    1970             :         // metaindex block must be sorted by key.
    1971           2 :         var rangeDelBH BlockHandle
    1972           2 :         if w.props.NumRangeDeletions > 0 {
    1973           2 :                 if !w.rangeDelV1Format {
    1974           2 :                         // Because the range tombstones are fragmented in the v2 format, the end
    1975           2 :                         // key of the last added range tombstone will be the largest range
    1976           2 :                         // tombstone key. Note that we need to make this into a range deletion
    1977           2 :                         // sentinel because sstable boundaries are inclusive while the end key of
    1978           2 :                         // a range deletion tombstone is exclusive. A Clone() is necessary as
    1979           2 :                         // rangeDelBlock.curValue is the same slice that will get passed
    1980           2 :                         // into w.writer, and some implementations of vfs.File mutate the
    1981           2 :                         // slice passed into Write(). Also, w.meta will often outlive the
    1982           2 :                         // blockWriter, and so cloning curValue allows the rangeDelBlock's
    1983           2 :                         // internal buffer to get gc'd.
    1984           2 :                         k := base.MakeRangeDeleteSentinelKey(w.rangeDelBlock.curValue).Clone()
    1985           2 :                         w.meta.SetLargestRangeDelKey(k)
    1986           2 :                 }
    1987           2 :                 rangeDelBH, err = w.writeBlock(w.rangeDelBlock.finish(), NoCompression, &w.blockBuf)
    1988           2 :                 if err != nil {
    1989           0 :                         return err
    1990           0 :                 }
    1991             :         }
    1992             : 
    1993             :         // Write the range-key block, flushing any remaining spans from the
    1994             :         // fragmenter first.
    1995           2 :         w.fragmenter.Finish()
    1996           2 : 
    1997           2 :         var rangeKeyBH BlockHandle
    1998           2 :         if w.props.NumRangeKeys() > 0 {
    1999           2 :                 key := w.rangeKeyBlock.getCurKey()
    2000           2 :                 kind := key.Kind()
    2001           2 :                 endKey, _, ok := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.curValue)
    2002           2 :                 if !ok {
    2003           0 :                         return errors.Newf("invalid end key: %s", w.rangeKeyBlock.curValue)
    2004           0 :                 }
    2005           2 :                 k := base.MakeExclusiveSentinelKey(kind, endKey).Clone()
    2006           2 :                 w.meta.SetLargestRangeKey(k)
    2007           2 :                 // TODO(travers): The lack of compression on the range key block matches the
    2008           2 :                 // lack of compression on the range-del block. Revisit whether we want to
    2009           2 :                 // enable compression on this block.
    2010           2 :                 rangeKeyBH, err = w.writeBlock(w.rangeKeyBlock.finish(), NoCompression, &w.blockBuf)
    2011           2 :                 if err != nil {
    2012           0 :                         return err
    2013           0 :                 }
    2014             :         }
    2015             : 
    2016           2 :         if w.valueBlockWriter != nil {
    2017           2 :                 vbiHandle, vbStats, err := w.valueBlockWriter.finish(w, w.meta.Size)
    2018           2 :                 if err != nil {
    2019           0 :                         return err
    2020           0 :                 }
    2021           2 :                 w.props.NumValueBlocks = vbStats.numValueBlocks
    2022           2 :                 w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
    2023           2 :                 w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
    2024           2 :                 if vbStats.numValueBlocks > 0 {
    2025           2 :                         n := encodeValueBlocksIndexHandle(w.blockBuf.tmp[:], vbiHandle)
    2026           2 :                         metaindex.add(InternalKey{UserKey: []byte(metaValueIndexName)}, w.blockBuf.tmp[:n])
    2027           2 :                 }
    2028             :         }
    2029             : 
    2030             :         // Add the range key block handle to the metaindex block. Note that we add the
    2031             :         // block handle to the metaindex block before the other meta blocks as the
    2032             :         // metaindex block entries must be sorted, and the range key block name sorts
    2033             :         // before the other block names.
    2034           2 :         if w.props.NumRangeKeys() > 0 {
    2035           2 :                 n := encodeBlockHandle(w.blockBuf.tmp[:], rangeKeyBH)
    2036           2 :                 metaindex.add(InternalKey{UserKey: []byte(metaRangeKeyName)}, w.blockBuf.tmp[:n])
    2037           2 :         }
    2038             : 
    2039           2 :         {
    2040           2 :                 userProps := make(map[string]string)
    2041           2 :                 for i := range w.propCollectors {
    2042           1 :                         if err := w.propCollectors[i].Finish(userProps); err != nil {
    2043           1 :                                 return err
    2044           1 :                         }
    2045             :                 }
    2046           2 :                 for i := range w.blockPropCollectors {
    2047           2 :                         scratch := w.blockPropsEncoder.getScratchForProp()
    2048           2 :                         // Place the shortID in the first byte.
    2049           2 :                         scratch = append(scratch, byte(i))
    2050           2 :                         buf, err := w.blockPropCollectors[i].FinishTable(scratch)
    2051           2 :                         if err != nil {
    2052           1 :                                 return err
    2053           1 :                         }
    2054           2 :                         var prop string
    2055           2 :                         if len(buf) > 0 {
    2056           2 :                                 prop = string(buf)
    2057           2 :                         }
    2058             :                         // NB: The property is populated in the map even if it is the
    2059             :                         // empty string, since the presence in the map is what indicates
    2060             :                         // that the block property collector was used when writing.
    2061           2 :                         userProps[w.blockPropCollectors[i].Name()] = prop
    2062             :                 }
    2063           2 :                 if len(userProps) > 0 {
    2064           2 :                         w.props.UserProperties = userProps
    2065           2 :                 }
    2066             : 
    2067             :                 // Write the properties block.
    2068           2 :                 var raw rawBlockWriter
    2069           2 :                 // The restart interval is set to infinity because the properties block
    2070           2 :                 // is always read sequentially and cached in a heap located object. This
    2071           2 :                 // reduces table size without a significant impact on performance.
    2072           2 :                 raw.restartInterval = propertiesBlockRestartInterval
    2073           2 :                 w.props.CompressionOptions = rocksDBCompressionOptions
    2074           2 :                 w.props.save(w.tableFormat, &raw)
    2075           2 :                 bh, err := w.writeBlock(raw.finish(), NoCompression, &w.blockBuf)
    2076           2 :                 if err != nil {
    2077           0 :                         return err
    2078           0 :                 }
    2079           2 :                 n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
    2080           2 :                 metaindex.add(InternalKey{UserKey: []byte(metaPropertiesName)}, w.blockBuf.tmp[:n])
    2081             :         }
    2082             : 
    2083             :         // Add the range deletion block handle to the metaindex block.
    2084           2 :         if w.props.NumRangeDeletions > 0 {
    2085           2 :                 n := encodeBlockHandle(w.blockBuf.tmp[:], rangeDelBH)
    2086           2 :                 // The v2 range-del block encoding is backwards compatible with the v1
    2087           2 :                 // encoding. We add meta-index entries for both the old name and the new
    2088           2 :                 // name so that old code can continue to find the range-del block and new
    2089           2 :                 // code knows that the range tombstones in the block are fragmented and
    2090           2 :                 // sorted.
    2091           2 :                 metaindex.add(InternalKey{UserKey: []byte(metaRangeDelName)}, w.blockBuf.tmp[:n])
    2092           2 :                 if !w.rangeDelV1Format {
    2093           2 :                         metaindex.add(InternalKey{UserKey: []byte(metaRangeDelV2Name)}, w.blockBuf.tmp[:n])
    2094           2 :                 }
    2095             :         }
    2096             : 
    2097             :         // Write the metaindex block. It might be an empty block, if the filter
    2098             :         // policy is nil. NoCompression is specified because a) RocksDB never
    2099             :         // compresses the meta-index block and b) RocksDB has some code paths which
    2100             :         // expect the meta-index block to not be compressed.
    2101           2 :         metaindexBH, err := w.writeBlock(metaindex.blockWriter.finish(), NoCompression, &w.blockBuf)
    2102           2 :         if err != nil {
    2103           0 :                 return err
    2104           0 :         }
    2105             : 
    2106             :         // Write the table footer.
    2107           2 :         footer := footer{
    2108           2 :                 format:      w.tableFormat,
    2109           2 :                 checksum:    w.blockBuf.checksummer.checksumType,
    2110           2 :                 metaindexBH: metaindexBH,
    2111           2 :                 indexBH:     indexBH,
    2112           2 :         }
    2113           2 :         encoded := footer.encode(w.blockBuf.tmp[:])
    2114           2 :         if err := w.writable.Write(footer.encode(w.blockBuf.tmp[:])); err != nil {
    2115           0 :                 return err
    2116           0 :         }
    2117           2 :         w.meta.Size += uint64(len(encoded))
    2118           2 :         w.meta.Properties = w.props
    2119           2 : 
    2120           2 :         // Check that the features present in the table are compatible with the format
    2121           2 :         // configured for the table.
    2122           2 :         if err = w.assertFormatCompatibility(); err != nil {
    2123           1 :                 return err
    2124           1 :         }
    2125             : 
    2126           2 :         if err := w.writable.Finish(); err != nil {
    2127           1 :                 w.writable = nil
    2128           1 :                 return err
    2129           1 :         }
    2130           2 :         w.writable = nil
    2131           2 : 
    2132           2 :         w.dataBlockBuf.clear()
    2133           2 :         dataBlockBufPool.Put(w.dataBlockBuf)
    2134           2 :         w.dataBlockBuf = nil
    2135           2 :         w.indexBlock.clear()
    2136           2 :         indexBlockBufPool.Put(w.indexBlock)
    2137           2 :         w.indexBlock = nil
    2138           2 : 
    2139           2 :         // Make any future calls to Set or Close return an error.
    2140           2 :         w.err = errWriterClosed
    2141           2 :         return nil
    2142             : }
    2143             : 
    2144             : // EstimatedSize returns the estimated size of the sstable being written if a
    2145             : // call to Finish() was made without adding additional keys.
    2146           2 : func (w *Writer) EstimatedSize() uint64 {
    2147           2 :         return w.coordination.sizeEstimate.size() +
    2148           2 :                 uint64(w.dataBlockBuf.dataBlock.estimatedSize()) +
    2149           2 :                 w.indexBlock.estimatedSize()
    2150           2 : }
    2151             : 
    2152             : // Metadata returns the metadata for the finished sstable. Only valid to call
    2153             : // after the sstable has been finished.
    2154           2 : func (w *Writer) Metadata() (*WriterMetadata, error) {
    2155           2 :         if w.writable != nil {
    2156           0 :                 return nil, errors.New("pebble: writer is not closed")
    2157           0 :         }
    2158           2 :         return &w.meta, nil
    2159             : }
    2160             : 
    2161             : // WriterOption provide an interface to do work on Writer while it is being
    2162             : // opened.
    2163             : type WriterOption interface {
    2164             :         // writerApply is called on the writer during opening in order to set
    2165             :         // internal parameters.
    2166             :         writerApply(*Writer)
    2167             : }
    2168             : 
    2169             : // PreviousPointKeyOpt is a WriterOption that provides access to the last
    2170             : // point key written to the writer while building a sstable.
    2171             : type PreviousPointKeyOpt struct {
    2172             :         w *Writer
    2173             : }
    2174             : 
    2175             : // UnsafeKey returns the last point key written to the writer to which this
    2176             : // option was passed during creation. The returned key points directly into
    2177             : // a buffer belonging to the Writer. The value's lifetime ends the next time a
    2178             : // point key is added to the Writer.
    2179             : // Invariant: UnsafeKey isn't and shouldn't be called after the Writer is closed.
    2180           2 : func (o PreviousPointKeyOpt) UnsafeKey() base.InternalKey {
    2181           2 :         if o.w == nil {
    2182           0 :                 return base.InvalidInternalKey
    2183           0 :         }
    2184             : 
    2185           2 :         if o.w.dataBlockBuf.dataBlock.nEntries >= 1 {
    2186           2 :                 // o.w.dataBlockBuf.dataBlock.curKey is guaranteed to point to the last point key
    2187           2 :                 // which was added to the Writer.
    2188           2 :                 return o.w.dataBlockBuf.dataBlock.getCurKey()
    2189           2 :         }
    2190           0 :         return base.InternalKey{}
    2191             : }
    2192             : 
    2193           2 : func (o *PreviousPointKeyOpt) writerApply(w *Writer) {
    2194           2 :         o.w = w
    2195           2 : }
    2196             : 
    2197             : // NewWriter returns a new table writer for the file. Closing the writer will
    2198             : // close the file.
    2199           2 : func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...WriterOption) *Writer {
    2200           2 :         o = o.ensureDefaults()
    2201           2 :         w := &Writer{
    2202           2 :                 writable: writable,
    2203           2 :                 meta: WriterMetadata{
    2204           2 :                         SmallestSeqNum: math.MaxUint64,
    2205           2 :                 },
    2206           2 :                 blockSize:               o.BlockSize,
    2207           2 :                 blockSizeThreshold:      (o.BlockSize*o.BlockSizeThreshold + 99) / 100,
    2208           2 :                 indexBlockSize:          o.IndexBlockSize,
    2209           2 :                 indexBlockSizeThreshold: (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100,
    2210           2 :                 compare:                 o.Comparer.Compare,
    2211           2 :                 split:                   o.Comparer.Split,
    2212           2 :                 formatKey:               o.Comparer.FormatKey,
    2213           2 :                 compression:             o.Compression,
    2214           2 :                 separator:               o.Comparer.Separator,
    2215           2 :                 successor:               o.Comparer.Successor,
    2216           2 :                 tableFormat:             o.TableFormat,
    2217           2 :                 isStrictObsolete:        o.IsStrictObsolete,
    2218           2 :                 writingToLowestLevel:    o.WritingToLowestLevel,
    2219           2 :                 cache:                   o.Cache,
    2220           2 :                 restartInterval:         o.BlockRestartInterval,
    2221           2 :                 checksumType:            o.Checksum,
    2222           2 :                 indexBlock:              newIndexBlockBuf(o.Parallelism),
    2223           2 :                 rangeDelBlock: blockWriter{
    2224           2 :                         restartInterval: 1,
    2225           2 :                 },
    2226           2 :                 rangeKeyBlock: blockWriter{
    2227           2 :                         restartInterval: 1,
    2228           2 :                 },
    2229           2 :                 topLevelIndexBlock: blockWriter{
    2230           2 :                         restartInterval: 1,
    2231           2 :                 },
    2232           2 :                 fragmenter: keyspan.Fragmenter{
    2233           2 :                         Cmp:    o.Comparer.Compare,
    2234           2 :                         Format: o.Comparer.FormatKey,
    2235           2 :                 },
    2236           2 :         }
    2237           2 :         if w.tableFormat >= TableFormatPebblev3 {
    2238           2 :                 w.shortAttributeExtractor = o.ShortAttributeExtractor
    2239           2 :                 w.requiredInPlaceValueBound = o.RequiredInPlaceValueBound
    2240           2 :                 w.valueBlockWriter = newValueBlockWriter(
    2241           2 :                         w.blockSize, w.blockSizeThreshold, w.compression, w.checksumType, func(compressedSize int) {
    2242           2 :                                 w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0)
    2243           2 :                         })
    2244             :         }
    2245             : 
    2246           2 :         w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
    2247           2 : 
    2248           2 :         w.blockBuf = blockBuf{
    2249           2 :                 checksummer: checksummer{checksumType: o.Checksum},
    2250           2 :         }
    2251           2 : 
    2252           2 :         w.coordination.init(o.Parallelism, w)
    2253           2 : 
    2254           2 :         if writable == nil {
    2255           0 :                 w.err = errors.New("pebble: nil writable")
    2256           0 :                 return w
    2257           0 :         }
    2258             : 
    2259             :         // Note that WriterOptions are applied in two places; the ones with a
    2260             :         // preApply() method are applied here. The rest are applied down below after
    2261             :         // default properties are set.
    2262           2 :         type preApply interface{ preApply() }
    2263           2 :         for _, opt := range extraOpts {
    2264           2 :                 if _, ok := opt.(preApply); ok {
    2265           2 :                         opt.writerApply(w)
    2266           2 :                 }
    2267             :         }
    2268             : 
    2269           2 :         w.props.PrefixExtractorName = "nullptr"
    2270           2 :         if o.FilterPolicy != nil {
    2271           2 :                 switch o.FilterType {
    2272           2 :                 case TableFilter:
    2273           2 :                         w.filter = newTableFilterWriter(o.FilterPolicy)
    2274           2 :                         if w.split != nil {
    2275           2 :                                 w.props.PrefixExtractorName = o.Comparer.Name
    2276           2 :                                 w.props.PrefixFiltering = true
    2277           2 :                         } else {
    2278           1 :                                 w.props.WholeKeyFiltering = true
    2279           1 :                         }
    2280           0 :                 default:
    2281           0 :                         panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
    2282             :                 }
    2283             :         }
    2284             : 
    2285           2 :         w.props.ComparerName = o.Comparer.Name
    2286           2 :         w.props.CompressionName = o.Compression.String()
    2287           2 :         w.props.MergerName = o.MergerName
    2288           2 :         w.props.PropertyCollectorNames = "[]"
    2289           2 :         w.props.ExternalFormatVersion = rocksDBExternalFormatVersion
    2290           2 : 
    2291           2 :         if len(o.TablePropertyCollectors) > 0 || len(o.BlockPropertyCollectors) > 0 ||
    2292           2 :                 w.tableFormat >= TableFormatPebblev4 {
    2293           2 :                 var buf bytes.Buffer
    2294           2 :                 buf.WriteString("[")
    2295           2 :                 if len(o.TablePropertyCollectors) > 0 {
    2296           1 :                         w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors))
    2297           1 :                         for i := range o.TablePropertyCollectors {
    2298           1 :                                 w.propCollectors[i] = o.TablePropertyCollectors[i]()
    2299           1 :                                 if i > 0 {
    2300           1 :                                         buf.WriteString(",")
    2301           1 :                                 }
    2302           1 :                                 buf.WriteString(w.propCollectors[i].Name())
    2303             :                         }
    2304             :                 }
    2305           2 :                 numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
    2306           2 :                 if w.tableFormat >= TableFormatPebblev4 {
    2307           2 :                         numBlockPropertyCollectors++
    2308           2 :                 }
    2309             :                 // shortID is a uint8, so we cannot exceed that number of block
    2310             :                 // property collectors.
    2311           2 :                 if numBlockPropertyCollectors > math.MaxUint8 {
    2312           0 :                         w.err = errors.New("pebble: too many block property collectors")
    2313           0 :                         return w
    2314           0 :                 }
    2315           2 :                 if numBlockPropertyCollectors > 0 {
    2316           2 :                         w.blockPropCollectors = make([]BlockPropertyCollector, numBlockPropertyCollectors)
    2317           2 :                 }
    2318           2 :                 if len(o.BlockPropertyCollectors) > 0 {
    2319           2 :                         // The shortID assigned to a collector is the same as its index in
    2320           2 :                         // this slice.
    2321           2 :                         for i := range o.BlockPropertyCollectors {
    2322           2 :                                 w.blockPropCollectors[i] = o.BlockPropertyCollectors[i]()
    2323           2 :                                 if i > 0 || len(o.TablePropertyCollectors) > 0 {
    2324           1 :                                         buf.WriteString(",")
    2325           1 :                                 }
    2326           2 :                                 buf.WriteString(w.blockPropCollectors[i].Name())
    2327             :                         }
    2328             :                 }
    2329           2 :                 if w.tableFormat >= TableFormatPebblev4 {
    2330           2 :                         if numBlockPropertyCollectors > 1 || len(o.TablePropertyCollectors) > 0 {
    2331           2 :                                 buf.WriteString(",")
    2332           2 :                         }
    2333           2 :                         w.blockPropCollectors[numBlockPropertyCollectors-1] = &w.obsoleteCollector
    2334           2 :                         buf.WriteString(w.obsoleteCollector.Name())
    2335             :                 }
    2336           2 :                 buf.WriteString("]")
    2337           2 :                 w.props.PropertyCollectorNames = buf.String()
    2338             :         }
    2339             : 
    2340             :         // Apply the remaining WriterOptions that do not have a preApply() method.
    2341           2 :         for _, opt := range extraOpts {
    2342           2 :                 if _, ok := opt.(preApply); ok {
    2343           2 :                         continue
    2344             :                 }
    2345           2 :                 opt.writerApply(w)
    2346             :         }
    2347             : 
    2348             :         // Initialize the range key fragmenter and encoder.
    2349           2 :         w.fragmenter.Emit = w.encodeRangeKeySpan
    2350           2 :         w.rangeKeyEncoder.Emit = w.addRangeKey
    2351           2 :         return w
    2352             : }
    2353             : 
    2354             : // internalGetProperties is a private, internal-use-only function that takes a
    2355             : // Writer and returns a pointer to its Properties, allowing direct mutation.
    2356             : // It's used by internal Pebble flushes and compactions to set internal
    2357             : // properties. It gets installed in private.
    2358           2 : func internalGetProperties(w *Writer) *Properties {
    2359           2 :         return &w.props
    2360           2 : }
    2361             : 
    2362           2 : func init() {
    2363           2 :         private.SSTableWriterDisableKeyOrderChecks = func(i interface{}) {
    2364           1 :                 w := i.(*Writer)
    2365           1 :                 w.disableKeyOrderChecks = true
    2366           1 :         }
    2367           2 :         private.SSTableInternalProperties = internalGetProperties
    2368             : }
    2369             : 
    2370             : type obsoleteKeyBlockPropertyCollector struct {
    2371             :         blockIsNonObsolete bool
    2372             :         indexIsNonObsolete bool
    2373             :         tableIsNonObsolete bool
    2374             : }
    2375             : 
    2376           2 : func encodeNonObsolete(isNonObsolete bool, buf []byte) []byte {
    2377           2 :         if isNonObsolete {
    2378           2 :                 return buf
    2379           2 :         }
    2380           2 :         return append(buf, 't')
    2381             : }
    2382             : 
    2383           2 : func (o *obsoleteKeyBlockPropertyCollector) Name() string {
    2384           2 :         return "obsolete-key"
    2385           2 : }
    2386             : 
    2387           2 : func (o *obsoleteKeyBlockPropertyCollector) Add(key InternalKey, value []byte) error {
    2388           2 :         // Ignore.
    2389           2 :         return nil
    2390           2 : }
    2391             : 
    2392           2 : func (o *obsoleteKeyBlockPropertyCollector) AddPoint(isObsolete bool) {
    2393           2 :         o.blockIsNonObsolete = o.blockIsNonObsolete || !isObsolete
    2394           2 : }
    2395             : 
    2396           2 : func (o *obsoleteKeyBlockPropertyCollector) FinishDataBlock(buf []byte) ([]byte, error) {
    2397           2 :         o.tableIsNonObsolete = o.tableIsNonObsolete || o.blockIsNonObsolete
    2398           2 :         return encodeNonObsolete(o.blockIsNonObsolete, buf), nil
    2399           2 : }
    2400             : 
    2401           2 : func (o *obsoleteKeyBlockPropertyCollector) AddPrevDataBlockToIndexBlock() {
    2402           2 :         o.indexIsNonObsolete = o.indexIsNonObsolete || o.blockIsNonObsolete
    2403           2 :         o.blockIsNonObsolete = false
    2404           2 : }
    2405             : 
    2406           2 : func (o *obsoleteKeyBlockPropertyCollector) FinishIndexBlock(buf []byte) ([]byte, error) {
    2407           2 :         indexIsNonObsolete := o.indexIsNonObsolete
    2408           2 :         o.indexIsNonObsolete = false
    2409           2 :         return encodeNonObsolete(indexIsNonObsolete, buf), nil
    2410           2 : }
    2411             : 
    2412           2 : func (o *obsoleteKeyBlockPropertyCollector) FinishTable(buf []byte) ([]byte, error) {
    2413           2 :         return encodeNonObsolete(o.tableIsNonObsolete, buf), nil
    2414           2 : }
    2415             : 
    2416             : func (o *obsoleteKeyBlockPropertyCollector) UpdateKeySuffixes(
    2417             :         oldProp []byte, oldSuffix, newSuffix []byte,
    2418           1 : ) error {
    2419           1 :         _, err := propToIsObsolete(oldProp)
    2420           1 :         if err != nil {
    2421           0 :                 return err
    2422           0 :         }
    2423             :         // Suffix rewriting currently loses the obsolete bit.
    2424           1 :         o.blockIsNonObsolete = true
    2425           1 :         return nil
    2426             : }
    2427             : 
    2428             : // NB: obsoleteKeyBlockPropertyFilter is stateless. This aspect of the filter
    2429             : // is used in table_cache.go for in-place modification of a filters slice.
    2430             : type obsoleteKeyBlockPropertyFilter struct{}
    2431             : 
    2432           2 : func (o obsoleteKeyBlockPropertyFilter) Name() string {
    2433           2 :         return "obsolete-key"
    2434           2 : }
    2435             : 
    2436             : // Intersects returns true if the set represented by prop intersects with
    2437             : // the set in the filter.
    2438           2 : func (o obsoleteKeyBlockPropertyFilter) Intersects(prop []byte) (bool, error) {
    2439           2 :         return propToIsObsolete(prop)
    2440           2 : }
    2441             : 
    2442           2 : func propToIsObsolete(prop []byte) (bool, error) {
    2443           2 :         if len(prop) == 0 {
    2444           2 :                 return true, nil
    2445           2 :         }
    2446           2 :         if len(prop) > 1 || prop[0] != 't' {
    2447           0 :                 return false, errors.Errorf("unexpected property %x", prop)
    2448           0 :         }
    2449           2 :         return false, nil
    2450             : }

Generated by: LCOV version 1.14