LCOV - code coverage report
Current view: top level - pebble/sstable - writer.go (source / functions) Hit Total Coverage
Test: 2024-09-09 08:16Z 376d455b - meta test only.lcov Lines: 71 174 40.8 %
Date: 2024-09-09 08:17:44 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "github.com/cockroachdb/errors"
       9             :         "github.com/cockroachdb/pebble/internal/base"
      10             :         "github.com/cockroachdb/pebble/internal/keyspan"
      11             :         "github.com/cockroachdb/pebble/objstorage"
      12             : )
      13             : 
      14             : // Writer is a table writer.
      15             : type Writer struct {
      16             :         rw  RawWriter
      17             :         err error
      18             :         // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
      19             :         // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
      20             :         // and values, emitting fragmented, coalesced spans as appropriate. Range
      21             :         // keys must be added in order of their start user-key.
      22             :         fragmenter keyspan.Fragmenter
      23             :         comparer   *base.Comparer
      24             :         // isStrictObsolete is true if the writer is configured to write and enforce
      25             :         // a 'strict obsolete' sstable. This includes prohibiting the addition of
      26             :         // MERGE keys. See the documentation in format.go for more details.
      27             :         isStrictObsolete bool
      28             :         rkBuf            []byte
      29             :         keyspanKeys      []keyspan.Key
      30             : }
      31             : 
      32             : // NewWriter returns a new table writer intended for building external sstables
      33             : // (eg, for ingestion or storage outside the LSM) for the file. Closing the
      34             : // writer will close the file.
      35             : //
      36             : // Internal clients should generally prefer NewRawWriter.
      37           1 : func NewWriter(writable objstorage.Writable, o WriterOptions) *Writer {
      38           1 :         o = o.ensureDefaults()
      39           1 :         rw := NewRawWriter(writable, o)
      40           1 :         w := &Writer{}
      41           1 :         *w = Writer{
      42           1 :                 rw: rw,
      43           1 :                 fragmenter: keyspan.Fragmenter{
      44           1 :                         Cmp:    o.Comparer.Compare,
      45           1 :                         Format: o.Comparer.FormatKey,
      46           1 :                         Emit:   w.encodeFragmentedRangeKeySpan,
      47           1 :                 },
      48           1 :                 comparer:         o.Comparer,
      49           1 :                 isStrictObsolete: o.IsStrictObsolete,
      50           1 :         }
      51           1 :         return w
      52           1 : }
      53             : 
      54           0 : func (w *Writer) encodeFragmentedRangeKeySpan(s keyspan.Span) {
      55           0 :         // This method is the emit function of the Fragmenter.
      56           0 :         //
      57           0 :         // NB: The span should only contain range keys and be internally consistent
      58           0 :         // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
      59           0 :         //
      60           0 :         // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
      61           0 :         // we may want to in the future.
      62           0 :         keyspan.SortKeysBySuffix(w.comparer.CompareSuffixes, s.Keys)
      63           0 : 
      64           0 :         if w.Error() == nil {
      65           0 :                 w.rw.EncodeSpan(s)
      66           0 :         }
      67             : }
      68             : 
      69             : // Error returns the current accumulated error if any.
      70           1 : func (w *Writer) Error() error {
      71           1 :         return errors.CombineErrors(w.rw.Error(), w.err)
      72           1 : }
      73             : 
      74             : // Raw returns the underlying RawWriter.
      75           1 : func (w *Writer) Raw() RawWriter { return w.rw }
      76             : 
      77             : // Set sets the value for the given key. The sequence number is set to 0.
      78             : // Intended for use to externally construct an sstable before ingestion into a
      79             : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
      80             : // order.
      81             : //
      82             : // TODO(peter): untested
      83           1 : func (w *Writer) Set(key, value []byte) error {
      84           1 :         if err := w.Error(); err != nil {
      85           0 :                 return err
      86           0 :         }
      87           1 :         if w.isStrictObsolete {
      88           0 :                 return errors.Errorf("use AddWithForceObsolete")
      89           0 :         }
      90             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
      91             :         // sstable delete the added points.
      92           1 :         return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
      93             : }
      94             : 
      95             : // Delete deletes the value for the given key. The sequence number is set to
      96             : // 0. Intended for use to externally construct an sstable before ingestion into
      97             : // a DB.
      98             : //
      99             : // TODO(peter): untested
     100           0 : func (w *Writer) Delete(key []byte) error {
     101           0 :         if err := w.Error(); err != nil {
     102           0 :                 return err
     103           0 :         }
     104           0 :         if w.isStrictObsolete {
     105           0 :                 return errors.Errorf("use AddWithForceObsolete")
     106           0 :         }
     107             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     108             :         // sstable delete the added points.
     109           0 :         return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
     110             : }
     111             : 
     112             : // DeleteRange deletes all of the keys (and values) in the range [start,end)
     113             : // (inclusive on start, exclusive on end). The sequence number is set to
     114             : // 0. Intended for use to externally construct an sstable before ingestion into
     115             : // a DB.
     116             : //
     117             : // Calls to DeleteRange must be made using already-fragmented (non-overlapping)
     118             : // spans and in sorted order.
     119             : //
     120             : // TODO(peter): untested
     121           1 : func (w *Writer) DeleteRange(start, end []byte) error {
     122           1 :         if err := w.Error(); err != nil {
     123           0 :                 return err
     124           0 :         }
     125           1 :         return w.rw.EncodeSpan(keyspan.Span{
     126           1 :                 Start: start,
     127           1 :                 End:   end,
     128           1 :                 Keys: append(w.keyspanKeys[:0], keyspan.Key{
     129           1 :                         Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete),
     130           1 :                 }),
     131           1 :         })
     132             : }
     133             : 
     134             : // Merge adds an action to the DB that merges the value at key with the new
     135             : // value. The details of the merge are dependent upon the configured merge
     136             : // operator. The sequence number is set to 0. Intended for use to externally
     137             : // construct an sstable before ingestion into a DB.
     138             : //
     139             : // TODO(peter): untested
     140           0 : func (w *Writer) Merge(key, value []byte) error {
     141           0 :         if err := w.Error(); err != nil {
     142           0 :                 return err
     143           0 :         }
     144           0 :         if w.isStrictObsolete {
     145           0 :                 return errors.Errorf("use AddWithForceObsolete")
     146           0 :         }
     147             :         // forceObsolete is false based on the assumption that no RANGEDELs in the
     148             :         // sstable that delete the added points. If the user configured this writer
     149             :         // to be strict-obsolete, addPoint will reject the addition of this MERGE.
     150           0 :         return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
     151             : }
     152             : 
     153             : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
     154             : // the given suffix to the given value. The resulting range key is given the
     155             : // sequence number zero, with the expectation that the resulting sstable will be
     156             : // ingested.
     157             : //
     158             : // Keys must be added to the table in increasing order of start key. Spans are
     159             : // not required to be fragmented. The same suffix may not be set or unset twice
     160             : // over the same keyspan, because it would result in inconsistent state. Both
     161             : // the Set and Unset would share the zero sequence number, and a key cannot be
     162             : // both simultaneously set and unset.
     163           0 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
     164           0 :         return w.addRangeKeySpanToFragmenter(keyspan.Span{
     165           0 :                 Start: w.tempRangeKeyCopy(start),
     166           0 :                 End:   w.tempRangeKeyCopy(end),
     167           0 :                 Keys: []keyspan.Key{
     168           0 :                         {
     169           0 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
     170           0 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
     171           0 :                                 Value:   w.tempRangeKeyCopy(value),
     172           0 :                         },
     173           0 :                 },
     174           0 :         })
     175           0 : }
     176             : 
     177             : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
     178             : // with the given suffix. The resulting range key is given the
     179             : // sequence number zero, with the expectation that the resulting sstable will be
     180             : // ingested.
     181             : //
     182             : // Keys must be added to the table in increasing order of start key. Spans are
     183             : // not required to be fragmented. The same suffix may not be set or unset twice
     184             : // over the same keyspan, because it would result in inconsistent state. Both
     185             : // the Set and Unset would share the zero sequence number, and a key cannot be
     186             : // both simultaneously set and unset.
     187           0 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
     188           0 :         return w.addRangeKeySpanToFragmenter(keyspan.Span{
     189           0 :                 Start: w.tempRangeKeyCopy(start),
     190           0 :                 End:   w.tempRangeKeyCopy(end),
     191           0 :                 Keys: []keyspan.Key{
     192           0 :                         {
     193           0 :                                 Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
     194           0 :                                 Suffix:  w.tempRangeKeyCopy(suffix),
     195           0 :                         },
     196           0 :                 },
     197           0 :         })
     198           0 : }
     199             : 
     200             : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
     201             : //
     202             : // Keys must be added to the table in increasing order of start key. Spans are
     203             : // not required to be fragmented.
     204           0 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
     205           0 :         return w.addRangeKeySpanToFragmenter(keyspan.Span{
     206           0 :                 Start: w.tempRangeKeyCopy(start),
     207           0 :                 End:   w.tempRangeKeyCopy(end),
     208           0 :                 Keys: []keyspan.Key{
     209           0 :                         {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
     210           0 :                 },
     211           0 :         })
     212           0 : }
     213             : 
     214           0 : func (w *Writer) addRangeKeySpanToFragmenter(span keyspan.Span) error {
     215           0 :         if w.comparer.Compare(span.Start, span.End) >= 0 {
     216           0 :                 return errors.Errorf(
     217           0 :                         "pebble: start key must be strictly less than end key",
     218           0 :                 )
     219           0 :         }
     220           0 :         if w.fragmenter.Start() != nil && w.comparer.Compare(w.fragmenter.Start(), span.Start) > 0 {
     221           0 :                 return errors.Errorf("pebble: spans must be added in order: %s > %s",
     222           0 :                         w.comparer.FormatKey(w.fragmenter.Start()), w.comparer.FormatKey(span.Start))
     223           0 :         }
     224             :         // Add this span to the fragmenter.
     225           0 :         w.fragmenter.Add(span)
     226           0 :         return w.Error()
     227             : }
     228             : 
     229             : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
     230             : // slice. Any byte written to the returned slice is retained for the lifetime of
     231             : // the Writer.
     232           0 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
     233           0 :         if cap(w.rkBuf)-len(w.rkBuf) < n {
     234           0 :                 size := len(w.rkBuf) + 2*n
     235           0 :                 if size < 2*cap(w.rkBuf) {
     236           0 :                         size = 2 * cap(w.rkBuf)
     237           0 :                 }
     238           0 :                 buf := make([]byte, len(w.rkBuf), size)
     239           0 :                 copy(buf, w.rkBuf)
     240           0 :                 w.rkBuf = buf
     241             :         }
     242           0 :         b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
     243           0 :         w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
     244           0 :         return b
     245             : }
     246             : 
     247             : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
     248             : // range key buffer.
     249           0 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
     250           0 :         if len(k) == 0 {
     251           0 :                 return nil
     252           0 :         }
     253           0 :         buf := w.tempRangeKeyBuf(len(k))
     254           0 :         copy(buf, k)
     255           0 :         return buf
     256             : }
     257             : 
     258             : // Metadata returns the metadata for the finished sstable. Only valid to call
     259             : // after the sstable has been finished.
     260           0 : func (w *Writer) Metadata() (*WriterMetadata, error) {
     261           0 :         return w.rw.Metadata()
     262           0 : }
     263             : 
     264             : // Close finishes writing the table and closes the underlying file that the
     265             : // table was written to.
     266           1 : func (w *Writer) Close() (err error) {
     267           1 :         if w.Error() == nil {
     268           1 :                 // Write the range-key block, flushing any remaining spans from the
     269           1 :                 // fragmenter first.
     270           1 :                 w.fragmenter.Finish()
     271           1 :         }
     272           1 :         return errors.CombineErrors(w.rw.Close(), w.err)
     273             : }
     274             : 
     275             : // RawWriter defines an interface for sstable writers. Implementations may vary
     276             : // depending on the TableFormat being written.
     277             : type RawWriter interface {
     278             :         // Error returns the current accumulated error if any.
     279             :         Error() error
     280             :         // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
     281             :         //
     282             :         // forceObsolete indicates whether the caller has determined that this key is
     283             :         // obsolete even though it may be the latest point key for this userkey. This
     284             :         // should be set to true for keys obsoleted by RANGEDELs, and is required for
     285             :         // strict-obsolete sstables. It's optional for non-strict-obsolete sstables.
     286             :         //
     287             :         // Note that there are two properties, S1 and S2 (see comment in format.go)
     288             :         // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
     289             :         // responsibility of the caller. S1 is solely the responsibility of the
     290             :         // callee.
     291             :         AddWithForceObsolete(
     292             :                 key InternalKey, value []byte, forceObsolete bool,
     293             :         ) error
     294             :         // EncodeSpan encodes the keys in the given span. The span can contain
     295             :         // either only RANGEDEL keys or only range keys.
     296             :         //
     297             :         // This is a low-level API that bypasses the fragmenter. The spans passed to
     298             :         // this function must be fragmented and ordered.
     299             :         EncodeSpan(span keyspan.Span) error
     300             :         // EstimatedSize returns the estimated size of the sstable being written if
     301             :         // a call to Close() was made without adding additional keys.
     302             :         EstimatedSize() uint64
     303             :         // Close finishes writing the table and closes the underlying file that the
     304             :         // table was written to.
     305             :         Close() error
     306             :         // Metadata returns the metadata for the finished sstable. Only valid to
     307             :         // call after the sstable has been finished.
     308             :         Metadata() (*WriterMetadata, error)
     309             : }
     310             : 
     311             : // WriterMetadata holds info about a finished sstable.
     312             : type WriterMetadata struct {
     313             :         Size          uint64
     314             :         SmallestPoint InternalKey
     315             :         // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
     316             :         // before Writer.Close is called, because they may only be set on
     317             :         // Writer.Close.
     318             :         LargestPoint     InternalKey
     319             :         SmallestRangeDel InternalKey
     320             :         LargestRangeDel  InternalKey
     321             :         SmallestRangeKey InternalKey
     322             :         LargestRangeKey  InternalKey
     323             :         HasPointKeys     bool
     324             :         HasRangeDelKeys  bool
     325             :         HasRangeKeys     bool
     326             :         SmallestSeqNum   base.SeqNum
     327             :         LargestSeqNum    base.SeqNum
     328             :         Properties       Properties
     329             : }
     330             : 
     331             : // SetSmallestPointKey sets the smallest point key to the given key.
     332             : // NB: this method set the "absolute" smallest point key. Any existing key is
     333             : // overridden.
     334           1 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
     335           1 :         m.SmallestPoint = k
     336           1 :         m.HasPointKeys = true
     337           1 : }
     338             : 
     339             : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
     340             : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
     341             : // overridden.
     342           1 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
     343           1 :         m.SmallestRangeDel = k
     344           1 :         m.HasRangeDelKeys = true
     345           1 : }
     346             : 
     347             : // SetSmallestRangeKey sets the smallest range key to the given key.
     348             : // NB: this method set the "absolute" smallest range key. Any existing key is
     349             : // overridden.
     350           1 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
     351           1 :         m.SmallestRangeKey = k
     352           1 :         m.HasRangeKeys = true
     353           1 : }
     354             : 
     355             : // SetLargestPointKey sets the largest point key to the given key.
     356             : // NB: this method set the "absolute" largest point key. Any existing key is
     357             : // overridden.
     358           1 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
     359           1 :         m.LargestPoint = k
     360           1 :         m.HasPointKeys = true
     361           1 : }
     362             : 
     363             : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
     364             : // NB: this method set the "absolute" largest rangedel key. Any existing key is
     365             : // overridden.
     366           1 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
     367           1 :         m.LargestRangeDel = k
     368           1 :         m.HasRangeDelKeys = true
     369           1 : }
     370             : 
     371             : // SetLargestRangeKey sets the largest range key to the given key.
     372             : // NB: this method set the "absolute" largest range key. Any existing key is
     373             : // overridden.
     374           1 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
     375           1 :         m.LargestRangeKey = k
     376           1 :         m.HasRangeKeys = true
     377           1 : }
     378             : 
     379           1 : func (m *WriterMetadata) updateSeqNum(seqNum base.SeqNum) {
     380           1 :         if m.SmallestSeqNum > seqNum {
     381           1 :                 m.SmallestSeqNum = seqNum
     382           1 :         }
     383           1 :         if m.LargestSeqNum < seqNum {
     384           1 :                 m.LargestSeqNum = seqNum
     385           1 :         }
     386             : }

Generated by: LCOV version 1.14