Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "github.com/cockroachdb/errors"
9 : "github.com/cockroachdb/pebble/internal/base"
10 : "github.com/cockroachdb/pebble/internal/keyspan"
11 : "github.com/cockroachdb/pebble/objstorage"
12 : )
13 :
14 : // Writer is a table writer.
15 : type Writer struct {
16 : rw RawWriter
17 : err error
18 : // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
19 : // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
20 : // and values, emitting fragmented, coalesced spans as appropriate. Range
21 : // keys must be added in order of their start user-key.
22 : fragmenter keyspan.Fragmenter
23 : comparer *base.Comparer
24 : // isStrictObsolete is true if the writer is configured to write and enforce
25 : // a 'strict obsolete' sstable. This includes prohibiting the addition of
26 : // MERGE keys. See the documentation in format.go for more details.
27 : isStrictObsolete bool
28 : rkBuf []byte
29 : keyspanKeys []keyspan.Key
30 : }
31 :
32 : // NewWriter returns a new table writer intended for building external sstables
33 : // (eg, for ingestion or storage outside the LSM) for the file. Closing the
34 : // writer will close the file.
35 : //
36 : // Internal clients should generally prefer NewRawWriter.
37 1 : func NewWriter(writable objstorage.Writable, o WriterOptions) *Writer {
38 1 : o = o.ensureDefaults()
39 1 : rw := NewRawWriter(writable, o)
40 1 : w := &Writer{}
41 1 : *w = Writer{
42 1 : rw: rw,
43 1 : fragmenter: keyspan.Fragmenter{
44 1 : Cmp: o.Comparer.Compare,
45 1 : Format: o.Comparer.FormatKey,
46 1 : Emit: w.encodeFragmentedRangeKeySpan,
47 1 : },
48 1 : comparer: o.Comparer,
49 1 : isStrictObsolete: o.IsStrictObsolete,
50 1 : }
51 1 : return w
52 1 : }
53 :
54 0 : func (w *Writer) encodeFragmentedRangeKeySpan(s keyspan.Span) {
55 0 : // This method is the emit function of the Fragmenter.
56 0 : //
57 0 : // NB: The span should only contain range keys and be internally consistent
58 0 : // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
59 0 : //
60 0 : // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
61 0 : // we may want to in the future.
62 0 : keyspan.SortKeysBySuffix(w.comparer.CompareSuffixes, s.Keys)
63 0 :
64 0 : if w.Error() == nil {
65 0 : w.rw.EncodeSpan(s)
66 0 : }
67 : }
68 :
69 : // Error returns the current accumulated error if any.
70 1 : func (w *Writer) Error() error {
71 1 : return errors.CombineErrors(w.rw.Error(), w.err)
72 1 : }
73 :
74 : // Raw returns the underlying RawWriter.
75 1 : func (w *Writer) Raw() RawWriter { return w.rw }
76 :
77 : // Set sets the value for the given key. The sequence number is set to 0.
78 : // Intended for use to externally construct an sstable before ingestion into a
79 : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
80 : // order.
81 : //
82 : // TODO(peter): untested
83 1 : func (w *Writer) Set(key, value []byte) error {
84 1 : if err := w.Error(); err != nil {
85 0 : return err
86 0 : }
87 1 : if w.isStrictObsolete {
88 0 : return errors.Errorf("use AddWithForceObsolete")
89 0 : }
90 : // forceObsolete is false based on the assumption that no RANGEDELs in the
91 : // sstable delete the added points.
92 1 : return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
93 : }
94 :
95 : // Delete deletes the value for the given key. The sequence number is set to
96 : // 0. Intended for use to externally construct an sstable before ingestion into
97 : // a DB.
98 : //
99 : // TODO(peter): untested
100 0 : func (w *Writer) Delete(key []byte) error {
101 0 : if err := w.Error(); err != nil {
102 0 : return err
103 0 : }
104 0 : if w.isStrictObsolete {
105 0 : return errors.Errorf("use AddWithForceObsolete")
106 0 : }
107 : // forceObsolete is false based on the assumption that no RANGEDELs in the
108 : // sstable delete the added points.
109 0 : return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
110 : }
111 :
112 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
113 : // (inclusive on start, exclusive on end). The sequence number is set to
114 : // 0. Intended for use to externally construct an sstable before ingestion into
115 : // a DB.
116 : //
117 : // Calls to DeleteRange must be made using already-fragmented (non-overlapping)
118 : // spans and in sorted order.
119 : //
120 : // TODO(peter): untested
121 1 : func (w *Writer) DeleteRange(start, end []byte) error {
122 1 : if err := w.Error(); err != nil {
123 0 : return err
124 0 : }
125 1 : return w.rw.EncodeSpan(keyspan.Span{
126 1 : Start: start,
127 1 : End: end,
128 1 : Keys: append(w.keyspanKeys[:0], keyspan.Key{
129 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete),
130 1 : }),
131 1 : })
132 : }
133 :
134 : // Merge adds an action to the DB that merges the value at key with the new
135 : // value. The details of the merge are dependent upon the configured merge
136 : // operator. The sequence number is set to 0. Intended for use to externally
137 : // construct an sstable before ingestion into a DB.
138 : //
139 : // TODO(peter): untested
140 0 : func (w *Writer) Merge(key, value []byte) error {
141 0 : if err := w.Error(); err != nil {
142 0 : return err
143 0 : }
144 0 : if w.isStrictObsolete {
145 0 : return errors.Errorf("use AddWithForceObsolete")
146 0 : }
147 : // forceObsolete is false based on the assumption that no RANGEDELs in the
148 : // sstable that delete the added points. If the user configured this writer
149 : // to be strict-obsolete, addPoint will reject the addition of this MERGE.
150 0 : return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
151 : }
152 :
153 : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
154 : // the given suffix to the given value. The resulting range key is given the
155 : // sequence number zero, with the expectation that the resulting sstable will be
156 : // ingested.
157 : //
158 : // Keys must be added to the table in increasing order of start key. Spans are
159 : // not required to be fragmented. The same suffix may not be set or unset twice
160 : // over the same keyspan, because it would result in inconsistent state. Both
161 : // the Set and Unset would share the zero sequence number, and a key cannot be
162 : // both simultaneously set and unset.
163 0 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
164 0 : return w.addRangeKeySpanToFragmenter(keyspan.Span{
165 0 : Start: w.tempRangeKeyCopy(start),
166 0 : End: w.tempRangeKeyCopy(end),
167 0 : Keys: []keyspan.Key{
168 0 : {
169 0 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
170 0 : Suffix: w.tempRangeKeyCopy(suffix),
171 0 : Value: w.tempRangeKeyCopy(value),
172 0 : },
173 0 : },
174 0 : })
175 0 : }
176 :
177 : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
178 : // with the given suffix. The resulting range key is given the
179 : // sequence number zero, with the expectation that the resulting sstable will be
180 : // ingested.
181 : //
182 : // Keys must be added to the table in increasing order of start key. Spans are
183 : // not required to be fragmented. The same suffix may not be set or unset twice
184 : // over the same keyspan, because it would result in inconsistent state. Both
185 : // the Set and Unset would share the zero sequence number, and a key cannot be
186 : // both simultaneously set and unset.
187 0 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
188 0 : return w.addRangeKeySpanToFragmenter(keyspan.Span{
189 0 : Start: w.tempRangeKeyCopy(start),
190 0 : End: w.tempRangeKeyCopy(end),
191 0 : Keys: []keyspan.Key{
192 0 : {
193 0 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
194 0 : Suffix: w.tempRangeKeyCopy(suffix),
195 0 : },
196 0 : },
197 0 : })
198 0 : }
199 :
200 : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
201 : //
202 : // Keys must be added to the table in increasing order of start key. Spans are
203 : // not required to be fragmented.
204 0 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
205 0 : return w.addRangeKeySpanToFragmenter(keyspan.Span{
206 0 : Start: w.tempRangeKeyCopy(start),
207 0 : End: w.tempRangeKeyCopy(end),
208 0 : Keys: []keyspan.Key{
209 0 : {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
210 0 : },
211 0 : })
212 0 : }
213 :
214 0 : func (w *Writer) addRangeKeySpanToFragmenter(span keyspan.Span) error {
215 0 : if w.comparer.Compare(span.Start, span.End) >= 0 {
216 0 : return errors.Errorf(
217 0 : "pebble: start key must be strictly less than end key",
218 0 : )
219 0 : }
220 0 : if w.fragmenter.Start() != nil && w.comparer.Compare(w.fragmenter.Start(), span.Start) > 0 {
221 0 : return errors.Errorf("pebble: spans must be added in order: %s > %s",
222 0 : w.comparer.FormatKey(w.fragmenter.Start()), w.comparer.FormatKey(span.Start))
223 0 : }
224 : // Add this span to the fragmenter.
225 0 : w.fragmenter.Add(span)
226 0 : return w.Error()
227 : }
228 :
229 : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
230 : // slice. Any byte written to the returned slice is retained for the lifetime of
231 : // the Writer.
232 0 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
233 0 : if cap(w.rkBuf)-len(w.rkBuf) < n {
234 0 : size := len(w.rkBuf) + 2*n
235 0 : if size < 2*cap(w.rkBuf) {
236 0 : size = 2 * cap(w.rkBuf)
237 0 : }
238 0 : buf := make([]byte, len(w.rkBuf), size)
239 0 : copy(buf, w.rkBuf)
240 0 : w.rkBuf = buf
241 : }
242 0 : b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
243 0 : w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
244 0 : return b
245 : }
246 :
247 : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
248 : // range key buffer.
249 0 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
250 0 : if len(k) == 0 {
251 0 : return nil
252 0 : }
253 0 : buf := w.tempRangeKeyBuf(len(k))
254 0 : copy(buf, k)
255 0 : return buf
256 : }
257 :
258 : // Metadata returns the metadata for the finished sstable. Only valid to call
259 : // after the sstable has been finished.
260 0 : func (w *Writer) Metadata() (*WriterMetadata, error) {
261 0 : return w.rw.Metadata()
262 0 : }
263 :
264 : // Close finishes writing the table and closes the underlying file that the
265 : // table was written to.
266 1 : func (w *Writer) Close() (err error) {
267 1 : if w.Error() == nil {
268 1 : // Write the range-key block, flushing any remaining spans from the
269 1 : // fragmenter first.
270 1 : w.fragmenter.Finish()
271 1 : }
272 1 : return errors.CombineErrors(w.rw.Close(), w.err)
273 : }
274 :
275 : // RawWriter defines an interface for sstable writers. Implementations may vary
276 : // depending on the TableFormat being written.
277 : type RawWriter interface {
278 : // Error returns the current accumulated error if any.
279 : Error() error
280 : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
281 : //
282 : // forceObsolete indicates whether the caller has determined that this key is
283 : // obsolete even though it may be the latest point key for this userkey. This
284 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
285 : // strict-obsolete sstables. It's optional for non-strict-obsolete sstables.
286 : //
287 : // Note that there are two properties, S1 and S2 (see comment in format.go)
288 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
289 : // responsibility of the caller. S1 is solely the responsibility of the
290 : // callee.
291 : AddWithForceObsolete(
292 : key InternalKey, value []byte, forceObsolete bool,
293 : ) error
294 : // EncodeSpan encodes the keys in the given span. The span can contain
295 : // either only RANGEDEL keys or only range keys.
296 : //
297 : // This is a low-level API that bypasses the fragmenter. The spans passed to
298 : // this function must be fragmented and ordered.
299 : EncodeSpan(span keyspan.Span) error
300 : // EstimatedSize returns the estimated size of the sstable being written if
301 : // a call to Close() was made without adding additional keys.
302 : EstimatedSize() uint64
303 : // Close finishes writing the table and closes the underlying file that the
304 : // table was written to.
305 : Close() error
306 : // Metadata returns the metadata for the finished sstable. Only valid to
307 : // call after the sstable has been finished.
308 : Metadata() (*WriterMetadata, error)
309 : }
310 :
311 : // WriterMetadata holds info about a finished sstable.
312 : type WriterMetadata struct {
313 : Size uint64
314 : SmallestPoint InternalKey
315 : // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
316 : // before Writer.Close is called, because they may only be set on
317 : // Writer.Close.
318 : LargestPoint InternalKey
319 : SmallestRangeDel InternalKey
320 : LargestRangeDel InternalKey
321 : SmallestRangeKey InternalKey
322 : LargestRangeKey InternalKey
323 : HasPointKeys bool
324 : HasRangeDelKeys bool
325 : HasRangeKeys bool
326 : SmallestSeqNum base.SeqNum
327 : LargestSeqNum base.SeqNum
328 : Properties Properties
329 : }
330 :
331 : // SetSmallestPointKey sets the smallest point key to the given key.
332 : // NB: this method set the "absolute" smallest point key. Any existing key is
333 : // overridden.
334 1 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
335 1 : m.SmallestPoint = k
336 1 : m.HasPointKeys = true
337 1 : }
338 :
339 : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
340 : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
341 : // overridden.
342 1 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
343 1 : m.SmallestRangeDel = k
344 1 : m.HasRangeDelKeys = true
345 1 : }
346 :
347 : // SetSmallestRangeKey sets the smallest range key to the given key.
348 : // NB: this method set the "absolute" smallest range key. Any existing key is
349 : // overridden.
350 1 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
351 1 : m.SmallestRangeKey = k
352 1 : m.HasRangeKeys = true
353 1 : }
354 :
355 : // SetLargestPointKey sets the largest point key to the given key.
356 : // NB: this method set the "absolute" largest point key. Any existing key is
357 : // overridden.
358 1 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
359 1 : m.LargestPoint = k
360 1 : m.HasPointKeys = true
361 1 : }
362 :
363 : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
364 : // NB: this method set the "absolute" largest rangedel key. Any existing key is
365 : // overridden.
366 1 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
367 1 : m.LargestRangeDel = k
368 1 : m.HasRangeDelKeys = true
369 1 : }
370 :
371 : // SetLargestRangeKey sets the largest range key to the given key.
372 : // NB: this method set the "absolute" largest range key. Any existing key is
373 : // overridden.
374 1 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
375 1 : m.LargestRangeKey = k
376 1 : m.HasRangeKeys = true
377 1 : }
378 :
379 1 : func (m *WriterMetadata) updateSeqNum(seqNum base.SeqNum) {
380 1 : if m.SmallestSeqNum > seqNum {
381 1 : m.SmallestSeqNum = seqNum
382 1 : }
383 1 : if m.LargestSeqNum < seqNum {
384 1 : m.LargestSeqNum = seqNum
385 1 : }
386 : }
|