LCOV - code coverage report
Current view: top level - pebble/sstable - writer.go (source / functions) Hit Total Coverage
Test: 2024-10-26 08:16Z d844f6f3 - meta test only.lcov Lines: 76 179 42.5 %
Date: 2024-10-26 08:17:16 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "context"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/keyspan"
      13             :         "github.com/cockroachdb/pebble/objstorage"
      14             :         "github.com/cockroachdb/pebble/sstable/block"
      15             : )
      16             : 
      17             : // NewRawWriter returns a new table writer for the file. Closing the writer will
      18             : // close the file.
      19           1 : func NewRawWriter(writable objstorage.Writable, o WriterOptions) RawWriter {
      20           1 :         if o.TableFormat <= TableFormatPebblev4 {
      21           1 :                 return newRowWriter(writable, o)
      22           1 :         }
      23           1 :         return newColumnarWriter(writable, o)
      24             : }
      25             : 
      26             : // Writer is a table writer.
      27             : type Writer struct {
      28             :         rw  RawWriter
      29             :         err error
      30             :         // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
      31             :         // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
      32             :         // and values, emitting fragmented, coalesced spans as appropriate. Range
      33             :         // keys must be added in order of their start user-key.
      34             :         fragmenter keyspan.Fragmenter
      35             :         comparer   *base.Comparer
      36             :         // isStrictObsolete is true if the writer is configured to write and enforce
      37             :         // a 'strict obsolete' sstable. This includes prohibiting the addition of
      38             :         // MERGE keys. See the documentation in format.go for more details.
      39             :         isStrictObsolete bool
      40             :         rkBuf            []byte
      41             :         keyspanKeys      []keyspan.Key
      42             : }
      43             : 
      44             : // NewWriter returns a new table writer intended for building external sstables
      45             : // (eg, for ingestion or storage outside the LSM) for the file. Closing the
      46             : // writer will close the file.
      47             : //
      48             : // Internal clients should generally prefer NewRawWriter.
      49           1 : func NewWriter(writable objstorage.Writable, o WriterOptions) *Writer {
      50           1 :         o = o.ensureDefaults()
      51           1 :         rw := NewRawWriter(writable, o)
      52           1 :         w := &Writer{}
      53           1 :         *w = Writer{
      54           1 :                 rw: rw,
      55           1 :                 fragmenter: keyspan.Fragmenter{
      56           1 :                         Cmp:    o.Comparer.Compare,
      57           1 :                         Format: o.Comparer.FormatKey,
      58           1 :                         Emit:   w.encodeFragmentedRangeKeySpan,
      59           1 :                 },
      60           1 :                 comparer:         o.Comparer,
      61           1 :                 isStrictObsolete: o.IsStrictObsolete,
      62           1 :         }
      63           1 :         return w
      64           1 : }
      65             : 
      66           0 : func (w *Writer) encodeFragmentedRangeKeySpan(s keyspan.Span) {
      67           0 :         // This method is the emit function of the Fragmenter.
      68           0 :         //
      69           0 :         // NB: The span should only contain range keys and be internally consistent
      70           0 :         // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
      71           0 :         //
      72           0 :         // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
      73           0 :         // we may want to in the future.
      74           0 :         keyspan.SortKeysBySuffix(w.comparer.CompareRangeSuffixes, s.Keys)
      75           0 : 
      76           0 :         if w.Error() == nil {
      77           0 :                 w.rw.EncodeSpan(s)
      78           0 :         }
      79             : }
      80             : 
      81             : // Error returns the current accumulated error if any.
      82           1 : func (w *Writer) Error() error {
      83           1 :         return errors.CombineErrors(w.rw.Error(), w.err)
      84           1 : }
      85             : 
      86             : // Raw returns the underlying RawWriter.
      87           1 : func (w *Writer) Raw() RawWriter { return w.rw }
      88             : 
      89             : // Set sets the value for the given key. The sequence number is set to 0.
      90             : // Intended for use to externally construct an sstable before ingestion into a
      91             : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
      92             : // order.
      93             : //
      94             : // TODO(peter): untested
      95           1 : func (w *Writer) Set(key, value []byte) error {
      96           1 :         if err := w.Error(); err != nil {
      97           0 :                 return err
      98           0 :         }
      99           1 :         if w.isStrictObsolete {
     100           0 :                 return errors.Errorf("use AddWithForceObsolete")
     101           0 :         }
     102             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     103             :         // sstable delete the added points.
     104           1 :         return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
     105             : }
     106             : 
     107             : // Delete deletes the value for the given key. The sequence number is set to
     108             : // 0. Intended for use to externally construct an sstable before ingestion into
     109             : // a DB.
     110             : //
     111             : // TODO(peter): untested
     112           0 : func (w *Writer) Delete(key []byte) error {
     113           0 :         if err := w.Error(); err != nil {
     114           0 :                 return err
     115           0 :         }
     116           0 :         if w.isStrictObsolete {
     117           0 :                 return errors.Errorf("use AddWithForceObsolete")
     118           0 :         }
     119             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     120             :         // sstable delete the added points.
     121           0 :         return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
     122             : }
     123             : 
     124             : // DeleteRange deletes all of the keys (and values) in the range [start,end)
     125             : // (inclusive on start, exclusive on end). The sequence number is set to
     126             : // 0. Intended for use to externally construct an sstable before ingestion into
     127             : // a DB.
     128             : //
     129             : // Calls to DeleteRange must be made using already-fragmented (non-overlapping)
     130             : // spans and in sorted order.
     131             : //
     132             : // TODO(peter): untested
     133           1 : func (w *Writer) DeleteRange(start, end []byte) error {
     134           1 :         if err := w.Error(); err != nil {
     135           0 :                 return err
     136           0 :         }
     137           1 :         return w.rw.EncodeSpan(keyspan.Span{
     138           1 :                 Start: start,
     139           1 :                 End:   end,
     140           1 :                 Keys: append(w.keyspanKeys[:0], keyspan.Key{
     141           1 :                         Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete),
     142           1 :                 }),
     143           1 :         })
     144             : }
     145             : 
     146             : // Merge adds an action to the DB that merges the value at key with the new
     147             : // value. The details of the merge are dependent upon the configured merge
     148             : // operator. The sequence number is set to 0. Intended for use to externally
     149             : // construct an sstable before ingestion into a DB.
     150             : //
     151             : // TODO(peter): untested
     152           0 : func (w *Writer) Merge(key, value []byte) error {
     153           0 :         if err := w.Error(); err != nil {
     154           0 :                 return err
     155           0 :         }
     156           0 :         if w.isStrictObsolete {
     157           0 :                 return errors.Errorf("use AddWithForceObsolete")
     158           0 :         }
     159             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     160             :         // sstable that delete the added points. If the user configured this writer
     161             :         // to be strict-obsolete, addPoint will reject the addition of this MERGE.
     162           0 :         return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
     163             : }
     164             : 
     165             : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
     166             : // the given suffix to the given value. The resulting range key is given the
     167             : // sequence number zero, with the expectation that the resulting sstable will be
     168             : // ingested.
     169             : //
     170             : // Keys must be added to the table in increasing order of start key. Spans are
     171             : // not required to be fragmented. The same suffix may not be set or unset twice
     172             : // over the same keyspan, because it would result in inconsistent state. Both
     173             : // the Set and Unset would share the zero sequence number, and a key cannot be
     174             : // both simultaneously set and unset.
     175           0 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
     176           0 :         return w.addRangeKeySpanToFragmenter(keyspan.Span{
     177           0 :                 Start: w.tempRangeKeyCopy(start),
     178           0 :                 End:   w.tempRangeKeyCopy(end),
     179           0 :                 Keys: []keyspan.Key{
     180           0 :                         {
     181           0 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
     182           0 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
     183           0 :                                 Value:   w.tempRangeKeyCopy(value),
     184           0 :                         },
     185           0 :                 },
     186           0 :         })
     187           0 : }
     188             : 
     189             : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
     190             : // with the given suffix. The resulting range key is given the
     191             : // sequence number zero, with the expectation that the resulting sstable will be
     192             : // ingested.
     193             : //
     194             : // Keys must be added to the table in increasing order of start key. Spans are
     195             : // not required to be fragmented. The same suffix may not be set or unset twice
     196             : // over the same keyspan, because it would result in inconsistent state. Both
     197             : // the Set and Unset would share the zero sequence number, and a key cannot be
     198             : // both simultaneously set and unset.
     199           0 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
     200           0 :         return w.addRangeKeySpanToFragmenter(keyspan.Span{
     201           0 :                 Start: w.tempRangeKeyCopy(start),
     202           0 :                 End:   w.tempRangeKeyCopy(end),
     203           0 :                 Keys: []keyspan.Key{
     204           0 :                         {
     205           0 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
     206           0 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
     207           0 :                         },
     208           0 :                 },
     209           0 :         })
     210           0 : }
     211             : 
     212             : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
     213             : //
     214             : // Keys must be added to the table in increasing order of start key. Spans are
     215             : // not required to be fragmented.
     216           0 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
     217           0 :         return w.addRangeKeySpanToFragmenter(keyspan.Span{
     218           0 :                 Start: w.tempRangeKeyCopy(start),
     219           0 :                 End:   w.tempRangeKeyCopy(end),
     220           0 :                 Keys: []keyspan.Key{
     221           0 :                         {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
     222           0 :                 },
     223           0 :         })
     224           0 : }
     225             : 
     226           0 : func (w *Writer) addRangeKeySpanToFragmenter(span keyspan.Span) error {
     227           0 :         if w.comparer.Compare(span.Start, span.End) >= 0 {
     228           0 :                 return errors.Errorf(
     229           0 :                         "pebble: start key must be strictly less than end key",
     230           0 :                 )
     231           0 :         }
     232           0 :         if w.fragmenter.Start() != nil && w.comparer.Compare(w.fragmenter.Start(), span.Start) > 0 {
     233           0 :                 return errors.Errorf("pebble: spans must be added in order: %s > %s",
     234           0 :                         w.comparer.FormatKey(w.fragmenter.Start()), w.comparer.FormatKey(span.Start))
     235           0 :         }
     236             :         // Add this span to the fragmenter.
     237           0 :         w.fragmenter.Add(span)
     238           0 :         return w.Error()
     239             : }
     240             : 
     241             : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
     242             : // slice. Any byte written to the returned slice is retained for the lifetime of
     243             : // the Writer.
     244           0 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
     245           0 :         if cap(w.rkBuf)-len(w.rkBuf) < n {
     246           0 :                 size := len(w.rkBuf) + 2*n
     247           0 :                 if size < 2*cap(w.rkBuf) {
     248           0 :                         size = 2 * cap(w.rkBuf)
     249           0 :                 }
     250           0 :                 buf := make([]byte, len(w.rkBuf), size)
     251           0 :                 copy(buf, w.rkBuf)
     252           0 :                 w.rkBuf = buf
     253             :         }
     254           0 :         b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
     255           0 :         w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
     256           0 :         return b
     257             : }
     258             : 
     259             : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
     260             : // range key buffer.
     261           0 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
     262           0 :         if len(k) == 0 {
     263           0 :                 return nil
     264           0 :         }
     265           0 :         buf := w.tempRangeKeyBuf(len(k))
     266           0 :         copy(buf, k)
     267           0 :         return buf
     268             : }
     269             : 
     270             : // Metadata returns the metadata for the finished sstable. Only valid to call
     271             : // after the sstable has been finished.
     272           0 : func (w *Writer) Metadata() (*WriterMetadata, error) {
     273           0 :         return w.rw.Metadata()
     274           0 : }
     275             : 
     276             : // Close finishes writing the table and closes the underlying file that the
     277             : // table was written to.
     278           1 : func (w *Writer) Close() (err error) {
     279           1 :         if w.Error() == nil {
     280           1 :                 // Write the range-key block, flushing any remaining spans from the
     281           1 :                 // fragmenter first.
     282           1 :                 w.fragmenter.Finish()
     283           1 :         }
     284           1 :         return errors.CombineErrors(w.rw.Close(), w.err)
     285             : }
     286             : 
     287             : // RawWriter defines an interface for sstable writers. Implementations may vary
     288             : // depending on the TableFormat being written.
     289             : type RawWriter interface {
     290             :         // Error returns the current accumulated error if any.
     291             :         Error() error
     292             :         // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
     293             :         //
     294             :         // forceObsolete indicates whether the caller has determined that this key is
     295             :         // obsolete even though it may be the latest point key for this userkey. This
     296             :         // should be set to true for keys obsoleted by RANGEDELs, and is required for
     297             :         // strict-obsolete sstables. It's optional for non-strict-obsolete sstables.
     298             :         //
     299             :         // Note that there are two properties, S1 and S2 (see comment in format.go)
     300             :         // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
     301             :         // responsibility of the caller. S1 is solely the responsibility of the
     302             :         // callee.
     303             :         AddWithForceObsolete(
     304             :                 key InternalKey, value []byte, forceObsolete bool,
     305             :         ) error
     306             :         // EncodeSpan encodes the keys in the given span. The span can contain
     307             :         // either only RANGEDEL keys or only range keys.
     308             :         //
     309             :         // This is a low-level API that bypasses the fragmenter. The spans passed to
     310             :         // this function must be fragmented and ordered.
     311             :         EncodeSpan(span keyspan.Span) error
     312             :         // EstimatedSize returns the estimated size of the sstable being written if
     313             :         // a call to Close() was made without adding additional keys.
     314             :         EstimatedSize() uint64
     315             :         // ComparePrev compares the provided user to the last point key written to the
     316             :         // writer. The returned value is equivalent to Compare(key, prevKey) where
     317             :         // prevKey is the last point key written to the writer.
     318             :         //
     319             :         // If no key has been written yet, ComparePrev returns +1.
     320             :         //
     321             :         // Must not be called after Writer is closed.
     322             :         ComparePrev(k []byte) int
     323             :         // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
     324             :         // be used internally by Pebble.
     325             :         SetSnapshotPinnedProperties(keyCount, keySize, valueSize uint64)
     326             :         // Close finishes writing the table and closes the underlying file that the
     327             :         // table was written to.
     328             :         Close() error
     329             :         // Metadata returns the metadata for the finished sstable. Only valid to
     330             :         // call after the sstable has been finished.
     331             :         Metadata() (*WriterMetadata, error)
     332             : 
     333             :         // rewriteSuffixes rewrites the table's data blocks to all contain the
     334             :         // provided suffix. It's specifically used for the implementation of
     335             :         // RewriteKeySuffixesAndReturnFormat. See that function's documentation for
     336             :         // more details.
     337             :         rewriteSuffixes(r *Reader, wo WriterOptions, from, to []byte, concurrency int) error
     338             : 
     339             :         // copyDataBlocks copies data blocks to the table from the specified ReadHandle.
     340             :         // It's specifically used by the sstable copier that can copy parts of an sstable
     341             :         // to a new sstable, using CopySpan().
     342             :         copyDataBlocks(ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle) error
     343             : 
     344             :         // addDataBlock adds a raw data block to the table as-is. It's specifically used
     345             :         // by the sstable copier that can copy parts of an sstable to a new sstable,
     346             :         // using CopySpan().
     347             :         addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error
     348             : 
     349             :         // copyFilter copies the specified filter to the table. It's specifically used
     350             :         // by the sstable copier that can copy parts of an sstable to a new sstable,
     351             :         // using CopySpan().
     352             :         copyFilter(filter []byte, filterName string) error
     353             : 
     354             :         // copyProperties copies properties from the specified props, and resets others
     355             :         // to prepare for copying data blocks from another sstable. It's specifically
     356             :         // used by the sstable copier that can copy parts of an sstable to a new sstable,
     357             :         // using CopySpan().
     358             :         copyProperties(props Properties)
     359             : }
     360             : 
     361             : // WriterMetadata holds info about a finished sstable.
     362             : type WriterMetadata struct {
     363             :         Size          uint64
     364             :         SmallestPoint InternalKey
     365             :         // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
     366             :         // before Writer.Close is called, because they may only be set on
     367             :         // Writer.Close.
     368             :         LargestPoint     InternalKey
     369             :         SmallestRangeDel InternalKey
     370             :         LargestRangeDel  InternalKey
     371             :         SmallestRangeKey InternalKey
     372             :         LargestRangeKey  InternalKey
     373             :         HasPointKeys     bool
     374             :         HasRangeDelKeys  bool
     375             :         HasRangeKeys     bool
     376             :         SmallestSeqNum   base.SeqNum
     377             :         LargestSeqNum    base.SeqNum
     378             :         Properties       Properties
     379             : }
     380             : 
     381             : // SetSmallestPointKey sets the smallest point key to the given key.
     382             : // NB: this method set the "absolute" smallest point key. Any existing key is
     383             : // overridden.
     384           1 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
     385           1 :         m.SmallestPoint = k
     386           1 :         m.HasPointKeys = true
     387           1 : }
     388             : 
     389             : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
     390             : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
     391             : // overridden.
     392           1 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
     393           1 :         m.SmallestRangeDel = k
     394           1 :         m.HasRangeDelKeys = true
     395           1 : }
     396             : 
     397             : // SetSmallestRangeKey sets the smallest range key to the given key.
     398             : // NB: this method set the "absolute" smallest range key. Any existing key is
     399             : // overridden.
     400           1 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
     401           1 :         m.SmallestRangeKey = k
     402           1 :         m.HasRangeKeys = true
     403           1 : }
     404             : 
     405             : // SetLargestPointKey sets the largest point key to the given key.
     406             : // NB: this method set the "absolute" largest point key. Any existing key is
     407             : // overridden.
     408           1 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
     409           1 :         m.LargestPoint = k
     410           1 :         m.HasPointKeys = true
     411           1 : }
     412             : 
     413             : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
     414             : // NB: this method set the "absolute" largest rangedel key. Any existing key is
     415             : // overridden.
     416           1 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
     417           1 :         m.LargestRangeDel = k
     418           1 :         m.HasRangeDelKeys = true
     419           1 : }
     420             : 
     421             : // SetLargestRangeKey sets the largest range key to the given key.
     422             : // NB: this method set the "absolute" largest range key. Any existing key is
     423             : // overridden.
     424           1 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
     425           1 :         m.LargestRangeKey = k
     426           1 :         m.HasRangeKeys = true
     427           1 : }
     428             : 
     429           1 : func (m *WriterMetadata) updateSeqNum(seqNum base.SeqNum) {
     430           1 :         if m.SmallestSeqNum > seqNum {
     431           1 :                 m.SmallestSeqNum = seqNum
     432           1 :         }
     433           1 :         if m.LargestSeqNum < seqNum {
     434           1 :                 m.LargestSeqNum = seqNum
     435           1 :         }
     436             : }

Generated by: LCOV version 1.14