LCOV - code coverage report
Current view: top level - pebble/sstable - writer.go (source / functions) Hit Total Coverage
Test: 2024-11-18 08:17Z 9ed54bc4 - tests + meta.lcov Lines: 162 187 86.6 %
Date: 2024-11-18 08:18:26 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "context"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/keyspan"
      13             :         "github.com/cockroachdb/pebble/objstorage"
      14             :         "github.com/cockroachdb/pebble/sstable/block"
      15             : )
      16             : 
      17             : // NewRawWriter returns a new table writer for the file. Closing the writer will
      18             : // close the file.
      19           2 : func NewRawWriter(writable objstorage.Writable, o WriterOptions) RawWriter {
      20           2 :         if o.TableFormat <= TableFormatPebblev4 {
      21           2 :                 return newRowWriter(writable, o)
      22           2 :         }
      23           2 :         return newColumnarWriter(writable, o)
      24             : }
      25             : 
      26             : // Writer is a table writer.
      27             : type Writer struct {
      28             :         rw  RawWriter
      29             :         err error
      30             :         // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
      31             :         // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
      32             :         // and values, emitting fragmented, coalesced spans as appropriate. Range
      33             :         // keys must be added in order of their start user-key.
      34             :         fragmenter keyspan.Fragmenter
      35             :         comparer   *base.Comparer
      36             :         // isStrictObsolete is true if the writer is configured to write and enforce
      37             :         // a 'strict obsolete' sstable. This includes prohibiting the addition of
      38             :         // MERGE keys. See the documentation in format.go for more details.
      39             :         isStrictObsolete bool
      40             :         rkBuf            []byte
      41             :         keyspanKeys      []keyspan.Key
      42             : }
      43             : 
      44             : // NewWriter returns a new table writer intended for building external sstables
      45             : // (eg, for ingestion or storage outside the LSM) for the file. Closing the
      46             : // writer will close the file.
      47             : //
      48             : // Internal clients should generally prefer NewRawWriter.
      49           2 : func NewWriter(writable objstorage.Writable, o WriterOptions) *Writer {
      50           2 :         o = o.ensureDefaults()
      51           2 :         rw := NewRawWriter(writable, o)
      52           2 :         w := &Writer{}
      53           2 :         *w = Writer{
      54           2 :                 rw: rw,
      55           2 :                 fragmenter: keyspan.Fragmenter{
      56           2 :                         Cmp:    o.Comparer.Compare,
      57           2 :                         Format: o.Comparer.FormatKey,
      58           2 :                         Emit:   w.encodeFragmentedRangeKeySpan,
      59           2 :                 },
      60           2 :                 comparer:         o.Comparer,
      61           2 :                 isStrictObsolete: o.IsStrictObsolete,
      62           2 :         }
      63           2 :         return w
      64           2 : }
      65             : 
      66           1 : func (w *Writer) encodeFragmentedRangeKeySpan(s keyspan.Span) {
      67           1 :         // This method is the emit function of the Fragmenter.
      68           1 :         //
      69           1 :         // NB: The span should only contain range keys and be internally consistent
      70           1 :         // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
      71           1 :         //
      72           1 :         // Sort the keys by trailer (descending).
      73           1 :         //
      74           1 :         // Note that sstables written in TableFormatPebblev4 and earlier (rowblk
      75           1 :         // encoding) will always re-order the Keys in order to encode RANGEKEYSETs
      76           1 :         // first, then RANGEKEYUNSETs and then RANGEKEYDELs. See rangekey.Encoder.
      77           1 :         // SSTables written in TableFormatPebblev5 or later (colblk encoding) will
      78           1 :         // encode the keys in this order.
      79           1 :         //
      80           1 :         // Iteration doesn't depend on this ordering, in particular because it's
      81           1 :         // inconsistent between the two encodings, but we may want eventually begin
      82           1 :         // to depend on this ordering for colblk-encoded sstables.
      83           1 :         keyspan.SortKeysByTrailerAndSuffix(w.comparer.CompareRangeSuffixes, s.Keys)
      84           1 :         if w.Error() == nil {
      85           1 :                 w.rw.EncodeSpan(s)
      86           1 :         }
      87             : }
      88             : 
      89             : // Error returns the current accumulated error if any.
      90           2 : func (w *Writer) Error() error {
      91           2 :         return errors.CombineErrors(w.rw.Error(), w.err)
      92           2 : }
      93             : 
      94             : // Raw returns the underlying RawWriter.
      95           2 : func (w *Writer) Raw() RawWriter { return w.rw }
      96             : 
      97             : // Set sets the value for the given key. The sequence number is set to 0.
      98             : // Intended for use to externally construct an sstable before ingestion into a
      99             : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
     100             : // order.
     101             : //
     102             : // TODO(peter): untested
     103           2 : func (w *Writer) Set(key, value []byte) error {
     104           2 :         if err := w.Error(); err != nil {
     105           0 :                 return err
     106           0 :         }
     107           2 :         if w.isStrictObsolete {
     108           0 :                 return errors.Errorf("use AddWithForceObsolete")
     109           0 :         }
     110             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     111             :         // sstable delete the added points.
     112           2 :         return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
     113             : }
     114             : 
     115             : // Delete deletes the value for the given key. The sequence number is set to
     116             : // 0. Intended for use to externally construct an sstable before ingestion into
     117             : // a DB.
     118             : //
     119             : // TODO(peter): untested
     120           1 : func (w *Writer) Delete(key []byte) error {
     121           1 :         if err := w.Error(); err != nil {
     122           0 :                 return err
     123           0 :         }
     124           1 :         if w.isStrictObsolete {
     125           0 :                 return errors.Errorf("use AddWithForceObsolete")
     126           0 :         }
     127             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     128             :         // sstable delete the added points.
     129           1 :         return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
     130             : }
     131             : 
     132             : // DeleteRange deletes all of the keys (and values) in the range [start,end)
     133             : // (inclusive on start, exclusive on end). The sequence number is set to
     134             : // 0. Intended for use to externally construct an sstable before ingestion into
     135             : // a DB.
     136             : //
     137             : // Calls to DeleteRange must be made using already-fragmented (non-overlapping)
     138             : // spans and in sorted order.
     139             : //
     140             : // TODO(peter): untested
     141           2 : func (w *Writer) DeleteRange(start, end []byte) error {
     142           2 :         if err := w.Error(); err != nil {
     143           0 :                 return err
     144           0 :         }
     145           2 :         return w.rw.EncodeSpan(keyspan.Span{
     146           2 :                 Start: start,
     147           2 :                 End:   end,
     148           2 :                 Keys: append(w.keyspanKeys[:0], keyspan.Key{
     149           2 :                         Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete),
     150           2 :                 }),
     151           2 :         })
     152             : }
     153             : 
     154             : // Merge adds an action to the DB that merges the value at key with the new
     155             : // value. The details of the merge are dependent upon the configured merge
     156             : // operator. The sequence number is set to 0. Intended for use to externally
     157             : // construct an sstable before ingestion into a DB.
     158             : //
     159             : // TODO(peter): untested
     160           0 : func (w *Writer) Merge(key, value []byte) error {
     161           0 :         if err := w.Error(); err != nil {
     162           0 :                 return err
     163           0 :         }
     164           0 :         if w.isStrictObsolete {
     165           0 :                 return errors.Errorf("use AddWithForceObsolete")
     166           0 :         }
     167             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     168             :         // sstable that delete the added points. If the user configured this writer
     169             :         // to be strict-obsolete, addPoint will reject the addition of this MERGE.
     170           0 :         return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
     171             : }
     172             : 
     173             : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
     174             : // the given suffix to the given value. The resulting range key is given the
     175             : // sequence number zero, with the expectation that the resulting sstable will be
     176             : // ingested.
     177             : //
     178             : // Keys must be added to the table in increasing order of start key. Spans are
     179             : // not required to be fragmented. The same suffix may not be set or unset twice
     180             : // over the same keyspan, because it would result in inconsistent state. Both
     181             : // the Set and Unset would share the zero sequence number, and a key cannot be
     182             : // both simultaneously set and unset.
     183           1 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
     184           1 :         return w.addRangeKeySpanToFragmenter(keyspan.Span{
     185           1 :                 Start: w.tempRangeKeyCopy(start),
     186           1 :                 End:   w.tempRangeKeyCopy(end),
     187           1 :                 Keys: []keyspan.Key{
     188           1 :                         {
     189           1 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
     190           1 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
     191           1 :                                 Value:   w.tempRangeKeyCopy(value),
     192           1 :                         },
     193           1 :                 },
     194           1 :         })
     195           1 : }
     196             : 
     197             : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
     198             : // with the given suffix. The resulting range key is given the
     199             : // sequence number zero, with the expectation that the resulting sstable will be
     200             : // ingested.
     201             : //
     202             : // Keys must be added to the table in increasing order of start key. Spans are
     203             : // not required to be fragmented. The same suffix may not be set or unset twice
     204             : // over the same keyspan, because it would result in inconsistent state. Both
     205             : // the Set and Unset would share the zero sequence number, and a key cannot be
     206             : // both simultaneously set and unset.
     207           1 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
     208           1 :         return w.addRangeKeySpanToFragmenter(keyspan.Span{
     209           1 :                 Start: w.tempRangeKeyCopy(start),
     210           1 :                 End:   w.tempRangeKeyCopy(end),
     211           1 :                 Keys: []keyspan.Key{
     212           1 :                         {
     213           1 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
     214           1 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
     215           1 :                         },
     216           1 :                 },
     217           1 :         })
     218           1 : }
     219             : 
     220             : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
     221             : //
     222             : // Keys must be added to the table in increasing order of start key. Spans are
     223             : // not required to be fragmented.
     224           1 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
     225           1 :         return w.addRangeKeySpanToFragmenter(keyspan.Span{
     226           1 :                 Start: w.tempRangeKeyCopy(start),
     227           1 :                 End:   w.tempRangeKeyCopy(end),
     228           1 :                 Keys: []keyspan.Key{
     229           1 :                         {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
     230           1 :                 },
     231           1 :         })
     232           1 : }
     233             : 
     234           1 : func (w *Writer) addRangeKeySpanToFragmenter(span keyspan.Span) error {
     235           1 :         if w.comparer.Compare(span.Start, span.End) >= 0 {
     236           0 :                 return errors.Errorf(
     237           0 :                         "pebble: start key must be strictly less than end key",
     238           0 :                 )
     239           0 :         }
     240           1 :         if w.fragmenter.Start() != nil && w.comparer.Compare(w.fragmenter.Start(), span.Start) > 0 {
     241           1 :                 return errors.Errorf("pebble: spans must be added in order: %s > %s",
     242           1 :                         w.comparer.FormatKey(w.fragmenter.Start()), w.comparer.FormatKey(span.Start))
     243           1 :         }
     244             :         // Add this span to the fragmenter.
     245           1 :         w.fragmenter.Add(span)
     246           1 :         return w.Error()
     247             : }
     248             : 
     249             : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
     250             : // slice. Any byte written to the returned slice is retained for the lifetime of
     251             : // the Writer.
     252           1 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
     253           1 :         if cap(w.rkBuf)-len(w.rkBuf) < n {
     254           1 :                 size := len(w.rkBuf) + 2*n
     255           1 :                 if size < 2*cap(w.rkBuf) {
     256           1 :                         size = 2 * cap(w.rkBuf)
     257           1 :                 }
     258           1 :                 buf := make([]byte, len(w.rkBuf), size)
     259           1 :                 copy(buf, w.rkBuf)
     260           1 :                 w.rkBuf = buf
     261             :         }
     262           1 :         b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
     263           1 :         w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
     264           1 :         return b
     265             : }
     266             : 
     267             : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
     268             : // range key buffer.
     269           1 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
     270           1 :         if len(k) == 0 {
     271           1 :                 return nil
     272           1 :         }
     273           1 :         buf := w.tempRangeKeyBuf(len(k))
     274           1 :         copy(buf, k)
     275           1 :         return buf
     276             : }
     277             : 
     278             : // Metadata returns the metadata for the finished sstable. Only valid to call
     279             : // after the sstable has been finished.
     280           0 : func (w *Writer) Metadata() (*WriterMetadata, error) {
     281           0 :         return w.rw.Metadata()
     282           0 : }
     283             : 
     284             : // Close finishes writing the table and closes the underlying file that the
     285             : // table was written to.
     286           2 : func (w *Writer) Close() (err error) {
     287           2 :         if w.Error() == nil {
     288           2 :                 // Write the range-key block, flushing any remaining spans from the
     289           2 :                 // fragmenter first.
     290           2 :                 w.fragmenter.Finish()
     291           2 :         }
     292           2 :         return errors.CombineErrors(w.rw.Close(), w.err)
     293             : }
     294             : 
     295             : // RawWriter defines an interface for sstable writers. Implementations may vary
     296             : // depending on the TableFormat being written.
     297             : type RawWriter interface {
     298             :         // Error returns the current accumulated error if any.
     299             :         Error() error
     300             :         // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
     301             :         //
     302             :         // forceObsolete indicates whether the caller has determined that this key is
     303             :         // obsolete even though it may be the latest point key for this userkey. This
     304             :         // should be set to true for keys obsoleted by RANGEDELs, and is required for
     305             :         // strict-obsolete sstables. It's optional for non-strict-obsolete sstables.
     306             :         //
     307             :         // Note that there are two properties, S1 and S2 (see comment in format.go)
     308             :         // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
     309             :         // responsibility of the caller. S1 is solely the responsibility of the
     310             :         // callee.
     311             :         AddWithForceObsolete(
     312             :                 key InternalKey, value []byte, forceObsolete bool,
     313             :         ) error
     314             :         // EncodeSpan encodes the keys in the given span. The span can contain
     315             :         // either only RANGEDEL keys or only range keys.
     316             :         //
     317             :         // This is a low-level API that bypasses the fragmenter. The spans passed to
     318             :         // this function must be fragmented and ordered.
     319             :         EncodeSpan(span keyspan.Span) error
     320             :         // EstimatedSize returns the estimated size of the sstable being written if
     321             :         // a call to Close() was made without adding additional keys.
     322             :         EstimatedSize() uint64
     323             :         // ComparePrev compares the provided user to the last point key written to the
     324             :         // writer. The returned value is equivalent to Compare(key, prevKey) where
     325             :         // prevKey is the last point key written to the writer.
     326             :         //
     327             :         // If no key has been written yet, ComparePrev returns +1.
     328             :         //
     329             :         // Must not be called after Writer is closed.
     330             :         ComparePrev(k []byte) int
     331             :         // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
     332             :         // be used internally by Pebble.
     333             :         SetSnapshotPinnedProperties(keyCount, keySize, valueSize uint64)
     334             :         // Close finishes writing the table and closes the underlying file that the
     335             :         // table was written to.
     336             :         Close() error
     337             :         // Metadata returns the metadata for the finished sstable. Only valid to
     338             :         // call after the sstable has been finished.
     339             :         Metadata() (*WriterMetadata, error)
     340             : 
     341             :         // rewriteSuffixes rewrites the table's data blocks to all contain the
     342             :         // provided suffix. It's specifically used for the implementation of
     343             :         // RewriteKeySuffixesAndReturnFormat. See that function's documentation for
     344             :         // more details.
     345             :         rewriteSuffixes(r *Reader, wo WriterOptions, from, to []byte, concurrency int) error
     346             : 
     347             :         // copyDataBlocks copies data blocks to the table from the specified ReadHandle.
     348             :         // It's specifically used by the sstable copier that can copy parts of an sstable
     349             :         // to a new sstable, using CopySpan().
     350             :         copyDataBlocks(ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle) error
     351             : 
     352             :         // addDataBlock adds a raw data block to the table as-is. It's specifically used
     353             :         // by the sstable copier that can copy parts of an sstable to a new sstable,
     354             :         // using CopySpan().
     355             :         addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error
     356             : 
     357             :         // copyFilter copies the specified filter to the table. It's specifically used
     358             :         // by the sstable copier that can copy parts of an sstable to a new sstable,
     359             :         // using CopySpan().
     360             :         copyFilter(filter []byte, filterName string) error
     361             : 
     362             :         // copyProperties copies properties from the specified props, and resets others
     363             :         // to prepare for copying data blocks from another sstable. It's specifically
     364             :         // used by the sstable copier that can copy parts of an sstable to a new sstable,
     365             :         // using CopySpan().
     366             :         copyProperties(props Properties)
     367             : }
     368             : 
     369             : // WriterMetadata holds info about a finished sstable.
     370             : type WriterMetadata struct {
     371             :         Size          uint64
     372             :         SmallestPoint InternalKey
     373             :         // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
     374             :         // before Writer.Close is called, because they may only be set on
     375             :         // Writer.Close.
     376             :         LargestPoint     InternalKey
     377             :         SmallestRangeDel InternalKey
     378             :         LargestRangeDel  InternalKey
     379             :         SmallestRangeKey InternalKey
     380             :         LargestRangeKey  InternalKey
     381             :         HasPointKeys     bool
     382             :         HasRangeDelKeys  bool
     383             :         HasRangeKeys     bool
     384             :         SmallestSeqNum   base.SeqNum
     385             :         LargestSeqNum    base.SeqNum
     386             :         Properties       Properties
     387             : }
     388             : 
     389             : // SetSmallestPointKey sets the smallest point key to the given key.
     390             : // NB: this method set the "absolute" smallest point key. Any existing key is
     391             : // overridden.
     392           2 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
     393           2 :         m.SmallestPoint = k
     394           2 :         m.HasPointKeys = true
     395           2 : }
     396             : 
     397             : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
     398             : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
     399             : // overridden.
     400           2 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
     401           2 :         m.SmallestRangeDel = k
     402           2 :         m.HasRangeDelKeys = true
     403           2 : }
     404             : 
     405             : // SetSmallestRangeKey sets the smallest range key to the given key.
     406             : // NB: this method set the "absolute" smallest range key. Any existing key is
     407             : // overridden.
     408           2 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
     409           2 :         m.SmallestRangeKey = k
     410           2 :         m.HasRangeKeys = true
     411           2 : }
     412             : 
     413             : // SetLargestPointKey sets the largest point key to the given key.
     414             : // NB: this method set the "absolute" largest point key. Any existing key is
     415             : // overridden.
     416           2 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
     417           2 :         m.LargestPoint = k
     418           2 :         m.HasPointKeys = true
     419           2 : }
     420             : 
     421             : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
     422             : // NB: this method set the "absolute" largest rangedel key. Any existing key is
     423             : // overridden.
     424           2 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
     425           2 :         m.LargestRangeDel = k
     426           2 :         m.HasRangeDelKeys = true
     427           2 : }
     428             : 
     429             : // SetLargestRangeKey sets the largest range key to the given key.
     430             : // NB: this method set the "absolute" largest range key. Any existing key is
     431             : // overridden.
     432           2 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
     433           2 :         m.LargestRangeKey = k
     434           2 :         m.HasRangeKeys = true
     435           2 : }
     436             : 
     437           2 : func (m *WriterMetadata) updateSeqNum(seqNum base.SeqNum) {
     438           2 :         if m.SmallestSeqNum > seqNum {
     439           2 :                 m.SmallestSeqNum = seqNum
     440           2 :         }
     441           2 :         if m.LargestSeqNum < seqNum {
     442           2 :                 m.LargestSeqNum = seqNum
     443           2 :         }
     444             : }

Generated by: LCOV version 1.14