Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/keyspan"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : "github.com/cockroachdb/pebble/sstable/block"
15 : )
16 :
17 : // NewRawWriter returns a new table writer for the file. Closing the writer will
18 : // close the file.
19 2 : func NewRawWriter(writable objstorage.Writable, o WriterOptions) RawWriter {
20 2 : if o.TableFormat <= TableFormatPebblev4 {
21 2 : return newRowWriter(writable, o)
22 2 : }
23 2 : return newColumnarWriter(writable, o)
24 : }
25 :
26 : // Writer is a table writer.
27 : type Writer struct {
28 : rw RawWriter
29 : err error
30 : // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
31 : // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
32 : // and values, emitting fragmented, coalesced spans as appropriate. Range
33 : // keys must be added in order of their start user-key.
34 : fragmenter keyspan.Fragmenter
35 : comparer *base.Comparer
36 : // isStrictObsolete is true if the writer is configured to write and enforce
37 : // a 'strict obsolete' sstable. This includes prohibiting the addition of
38 : // MERGE keys. See the documentation in format.go for more details.
39 : isStrictObsolete bool
40 : rkBuf []byte
41 : keyspanKeys []keyspan.Key
42 : }
43 :
44 : // NewWriter returns a new table writer intended for building external sstables
45 : // (eg, for ingestion or storage outside the LSM) for the file. Closing the
46 : // writer will close the file.
47 : //
48 : // Internal clients should generally prefer NewRawWriter.
49 2 : func NewWriter(writable objstorage.Writable, o WriterOptions) *Writer {
50 2 : o = o.ensureDefaults()
51 2 : rw := NewRawWriter(writable, o)
52 2 : w := &Writer{}
53 2 : *w = Writer{
54 2 : rw: rw,
55 2 : fragmenter: keyspan.Fragmenter{
56 2 : Cmp: o.Comparer.Compare,
57 2 : Format: o.Comparer.FormatKey,
58 2 : Emit: w.encodeFragmentedRangeKeySpan,
59 2 : },
60 2 : comparer: o.Comparer,
61 2 : isStrictObsolete: o.IsStrictObsolete,
62 2 : }
63 2 : return w
64 2 : }
65 :
66 1 : func (w *Writer) encodeFragmentedRangeKeySpan(s keyspan.Span) {
67 1 : // This method is the emit function of the Fragmenter.
68 1 : //
69 1 : // NB: The span should only contain range keys and be internally consistent
70 1 : // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
71 1 : //
72 1 : // Sort the keys by trailer (descending).
73 1 : //
74 1 : // Note that sstables written in TableFormatPebblev4 and earlier (rowblk
75 1 : // encoding) will always re-order the Keys in order to encode RANGEKEYSETs
76 1 : // first, then RANGEKEYUNSETs and then RANGEKEYDELs. See rangekey.Encoder.
77 1 : // SSTables written in TableFormatPebblev5 or later (colblk encoding) will
78 1 : // encode the keys in this order.
79 1 : //
80 1 : // Iteration doesn't depend on this ordering, in particular because it's
81 1 : // inconsistent between the two encodings, but we may want eventually begin
82 1 : // to depend on this ordering for colblk-encoded sstables.
83 1 : keyspan.SortKeysByTrailerAndSuffix(w.comparer.CompareRangeSuffixes, s.Keys)
84 1 : if w.Error() == nil {
85 1 : w.rw.EncodeSpan(s)
86 1 : }
87 : }
88 :
89 : // Error returns the current accumulated error if any.
90 2 : func (w *Writer) Error() error {
91 2 : return errors.CombineErrors(w.rw.Error(), w.err)
92 2 : }
93 :
94 : // Raw returns the underlying RawWriter.
95 2 : func (w *Writer) Raw() RawWriter { return w.rw }
96 :
97 : // Set sets the value for the given key. The sequence number is set to 0.
98 : // Intended for use to externally construct an sstable before ingestion into a
99 : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
100 : // order.
101 : //
102 : // TODO(peter): untested
103 2 : func (w *Writer) Set(key, value []byte) error {
104 2 : if err := w.Error(); err != nil {
105 0 : return err
106 0 : }
107 2 : if w.isStrictObsolete {
108 0 : return errors.Errorf("use AddWithForceObsolete")
109 0 : }
110 : // forceObsolete is false based on the assumption that no RANGEDELs in the
111 : // sstable delete the added points.
112 2 : return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
113 : }
114 :
115 : // Delete deletes the value for the given key. The sequence number is set to
116 : // 0. Intended for use to externally construct an sstable before ingestion into
117 : // a DB.
118 : //
119 : // TODO(peter): untested
120 1 : func (w *Writer) Delete(key []byte) error {
121 1 : if err := w.Error(); err != nil {
122 0 : return err
123 0 : }
124 1 : if w.isStrictObsolete {
125 0 : return errors.Errorf("use AddWithForceObsolete")
126 0 : }
127 : // forceObsolete is false based on the assumption that no RANGEDELs in the
128 : // sstable delete the added points.
129 1 : return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
130 : }
131 :
132 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
133 : // (inclusive on start, exclusive on end). The sequence number is set to
134 : // 0. Intended for use to externally construct an sstable before ingestion into
135 : // a DB.
136 : //
137 : // Calls to DeleteRange must be made using already-fragmented (non-overlapping)
138 : // spans and in sorted order.
139 : //
140 : // TODO(peter): untested
141 2 : func (w *Writer) DeleteRange(start, end []byte) error {
142 2 : if err := w.Error(); err != nil {
143 0 : return err
144 0 : }
145 2 : return w.rw.EncodeSpan(keyspan.Span{
146 2 : Start: start,
147 2 : End: end,
148 2 : Keys: append(w.keyspanKeys[:0], keyspan.Key{
149 2 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete),
150 2 : }),
151 2 : })
152 : }
153 :
154 : // Merge adds an action to the DB that merges the value at key with the new
155 : // value. The details of the merge are dependent upon the configured merge
156 : // operator. The sequence number is set to 0. Intended for use to externally
157 : // construct an sstable before ingestion into a DB.
158 : //
159 : // TODO(peter): untested
160 0 : func (w *Writer) Merge(key, value []byte) error {
161 0 : if err := w.Error(); err != nil {
162 0 : return err
163 0 : }
164 0 : if w.isStrictObsolete {
165 0 : return errors.Errorf("use AddWithForceObsolete")
166 0 : }
167 : // forceObsolete is false based on the assumption that no RANGEDELs in the
168 : // sstable that delete the added points. If the user configured this writer
169 : // to be strict-obsolete, addPoint will reject the addition of this MERGE.
170 0 : return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
171 : }
172 :
173 : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
174 : // the given suffix to the given value. The resulting range key is given the
175 : // sequence number zero, with the expectation that the resulting sstable will be
176 : // ingested.
177 : //
178 : // Keys must be added to the table in increasing order of start key. Spans are
179 : // not required to be fragmented. The same suffix may not be set or unset twice
180 : // over the same keyspan, because it would result in inconsistent state. Both
181 : // the Set and Unset would share the zero sequence number, and a key cannot be
182 : // both simultaneously set and unset.
183 1 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
184 1 : return w.addRangeKeySpanToFragmenter(keyspan.Span{
185 1 : Start: w.tempRangeKeyCopy(start),
186 1 : End: w.tempRangeKeyCopy(end),
187 1 : Keys: []keyspan.Key{
188 1 : {
189 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
190 1 : Suffix: w.tempRangeKeyCopy(suffix),
191 1 : Value: w.tempRangeKeyCopy(value),
192 1 : },
193 1 : },
194 1 : })
195 1 : }
196 :
197 : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
198 : // with the given suffix. The resulting range key is given the
199 : // sequence number zero, with the expectation that the resulting sstable will be
200 : // ingested.
201 : //
202 : // Keys must be added to the table in increasing order of start key. Spans are
203 : // not required to be fragmented. The same suffix may not be set or unset twice
204 : // over the same keyspan, because it would result in inconsistent state. Both
205 : // the Set and Unset would share the zero sequence number, and a key cannot be
206 : // both simultaneously set and unset.
207 1 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
208 1 : return w.addRangeKeySpanToFragmenter(keyspan.Span{
209 1 : Start: w.tempRangeKeyCopy(start),
210 1 : End: w.tempRangeKeyCopy(end),
211 1 : Keys: []keyspan.Key{
212 1 : {
213 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
214 1 : Suffix: w.tempRangeKeyCopy(suffix),
215 1 : },
216 1 : },
217 1 : })
218 1 : }
219 :
220 : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
221 : //
222 : // Keys must be added to the table in increasing order of start key. Spans are
223 : // not required to be fragmented.
224 1 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
225 1 : return w.addRangeKeySpanToFragmenter(keyspan.Span{
226 1 : Start: w.tempRangeKeyCopy(start),
227 1 : End: w.tempRangeKeyCopy(end),
228 1 : Keys: []keyspan.Key{
229 1 : {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
230 1 : },
231 1 : })
232 1 : }
233 :
234 1 : func (w *Writer) addRangeKeySpanToFragmenter(span keyspan.Span) error {
235 1 : if w.comparer.Compare(span.Start, span.End) >= 0 {
236 0 : return errors.Errorf(
237 0 : "pebble: start key must be strictly less than end key",
238 0 : )
239 0 : }
240 1 : if w.fragmenter.Start() != nil && w.comparer.Compare(w.fragmenter.Start(), span.Start) > 0 {
241 1 : return errors.Errorf("pebble: spans must be added in order: %s > %s",
242 1 : w.comparer.FormatKey(w.fragmenter.Start()), w.comparer.FormatKey(span.Start))
243 1 : }
244 : // Add this span to the fragmenter.
245 1 : w.fragmenter.Add(span)
246 1 : return w.Error()
247 : }
248 :
249 : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
250 : // slice. Any byte written to the returned slice is retained for the lifetime of
251 : // the Writer.
252 1 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
253 1 : if cap(w.rkBuf)-len(w.rkBuf) < n {
254 1 : size := len(w.rkBuf) + 2*n
255 1 : if size < 2*cap(w.rkBuf) {
256 1 : size = 2 * cap(w.rkBuf)
257 1 : }
258 1 : buf := make([]byte, len(w.rkBuf), size)
259 1 : copy(buf, w.rkBuf)
260 1 : w.rkBuf = buf
261 : }
262 1 : b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
263 1 : w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
264 1 : return b
265 : }
266 :
267 : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
268 : // range key buffer.
269 1 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
270 1 : if len(k) == 0 {
271 1 : return nil
272 1 : }
273 1 : buf := w.tempRangeKeyBuf(len(k))
274 1 : copy(buf, k)
275 1 : return buf
276 : }
277 :
278 : // Metadata returns the metadata for the finished sstable. Only valid to call
279 : // after the sstable has been finished.
280 0 : func (w *Writer) Metadata() (*WriterMetadata, error) {
281 0 : return w.rw.Metadata()
282 0 : }
283 :
284 : // Close finishes writing the table and closes the underlying file that the
285 : // table was written to.
286 2 : func (w *Writer) Close() (err error) {
287 2 : if w.Error() == nil {
288 2 : // Write the range-key block, flushing any remaining spans from the
289 2 : // fragmenter first.
290 2 : w.fragmenter.Finish()
291 2 : }
292 2 : return errors.CombineErrors(w.rw.Close(), w.err)
293 : }
294 :
295 : // RawWriter defines an interface for sstable writers. Implementations may vary
296 : // depending on the TableFormat being written.
297 : type RawWriter interface {
298 : // Error returns the current accumulated error if any.
299 : Error() error
300 : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
301 : //
302 : // forceObsolete indicates whether the caller has determined that this key is
303 : // obsolete even though it may be the latest point key for this userkey. This
304 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
305 : // strict-obsolete sstables. It's optional for non-strict-obsolete sstables.
306 : //
307 : // Note that there are two properties, S1 and S2 (see comment in format.go)
308 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
309 : // responsibility of the caller. S1 is solely the responsibility of the
310 : // callee.
311 : AddWithForceObsolete(
312 : key InternalKey, value []byte, forceObsolete bool,
313 : ) error
314 : // EncodeSpan encodes the keys in the given span. The span can contain
315 : // either only RANGEDEL keys or only range keys.
316 : //
317 : // This is a low-level API that bypasses the fragmenter. The spans passed to
318 : // this function must be fragmented and ordered.
319 : EncodeSpan(span keyspan.Span) error
320 : // EstimatedSize returns the estimated size of the sstable being written if
321 : // a call to Close() was made without adding additional keys.
322 : EstimatedSize() uint64
323 : // ComparePrev compares the provided user to the last point key written to the
324 : // writer. The returned value is equivalent to Compare(key, prevKey) where
325 : // prevKey is the last point key written to the writer.
326 : //
327 : // If no key has been written yet, ComparePrev returns +1.
328 : //
329 : // Must not be called after Writer is closed.
330 : ComparePrev(k []byte) int
331 : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
332 : // be used internally by Pebble.
333 : SetSnapshotPinnedProperties(keyCount, keySize, valueSize uint64)
334 : // Close finishes writing the table and closes the underlying file that the
335 : // table was written to.
336 : Close() error
337 : // Metadata returns the metadata for the finished sstable. Only valid to
338 : // call after the sstable has been finished.
339 : Metadata() (*WriterMetadata, error)
340 :
341 : // rewriteSuffixes rewrites the table's data blocks to all contain the
342 : // provided suffix. It's specifically used for the implementation of
343 : // RewriteKeySuffixesAndReturnFormat. See that function's documentation for
344 : // more details.
345 : rewriteSuffixes(r *Reader, wo WriterOptions, from, to []byte, concurrency int) error
346 :
347 : // copyDataBlocks copies data blocks to the table from the specified ReadHandle.
348 : // It's specifically used by the sstable copier that can copy parts of an sstable
349 : // to a new sstable, using CopySpan().
350 : copyDataBlocks(ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle) error
351 :
352 : // addDataBlock adds a raw data block to the table as-is. It's specifically used
353 : // by the sstable copier that can copy parts of an sstable to a new sstable,
354 : // using CopySpan().
355 : addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error
356 :
357 : // copyFilter copies the specified filter to the table. It's specifically used
358 : // by the sstable copier that can copy parts of an sstable to a new sstable,
359 : // using CopySpan().
360 : copyFilter(filter []byte, filterName string) error
361 :
362 : // copyProperties copies properties from the specified props, and resets others
363 : // to prepare for copying data blocks from another sstable. It's specifically
364 : // used by the sstable copier that can copy parts of an sstable to a new sstable,
365 : // using CopySpan().
366 : copyProperties(props Properties)
367 : }
368 :
369 : // WriterMetadata holds info about a finished sstable.
370 : type WriterMetadata struct {
371 : Size uint64
372 : SmallestPoint InternalKey
373 : // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
374 : // before Writer.Close is called, because they may only be set on
375 : // Writer.Close.
376 : LargestPoint InternalKey
377 : SmallestRangeDel InternalKey
378 : LargestRangeDel InternalKey
379 : SmallestRangeKey InternalKey
380 : LargestRangeKey InternalKey
381 : HasPointKeys bool
382 : HasRangeDelKeys bool
383 : HasRangeKeys bool
384 : SmallestSeqNum base.SeqNum
385 : LargestSeqNum base.SeqNum
386 : Properties Properties
387 : }
388 :
389 : // SetSmallestPointKey sets the smallest point key to the given key.
390 : // NB: this method set the "absolute" smallest point key. Any existing key is
391 : // overridden.
392 2 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
393 2 : m.SmallestPoint = k
394 2 : m.HasPointKeys = true
395 2 : }
396 :
397 : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
398 : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
399 : // overridden.
400 2 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
401 2 : m.SmallestRangeDel = k
402 2 : m.HasRangeDelKeys = true
403 2 : }
404 :
405 : // SetSmallestRangeKey sets the smallest range key to the given key.
406 : // NB: this method set the "absolute" smallest range key. Any existing key is
407 : // overridden.
408 2 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
409 2 : m.SmallestRangeKey = k
410 2 : m.HasRangeKeys = true
411 2 : }
412 :
413 : // SetLargestPointKey sets the largest point key to the given key.
414 : // NB: this method set the "absolute" largest point key. Any existing key is
415 : // overridden.
416 2 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
417 2 : m.LargestPoint = k
418 2 : m.HasPointKeys = true
419 2 : }
420 :
421 : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
422 : // NB: this method set the "absolute" largest rangedel key. Any existing key is
423 : // overridden.
424 2 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
425 2 : m.LargestRangeDel = k
426 2 : m.HasRangeDelKeys = true
427 2 : }
428 :
429 : // SetLargestRangeKey sets the largest range key to the given key.
430 : // NB: this method set the "absolute" largest range key. Any existing key is
431 : // overridden.
432 2 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
433 2 : m.LargestRangeKey = k
434 2 : m.HasRangeKeys = true
435 2 : }
436 :
437 2 : func (m *WriterMetadata) updateSeqNum(seqNum base.SeqNum) {
438 2 : if m.SmallestSeqNum > seqNum {
439 2 : m.SmallestSeqNum = seqNum
440 2 : }
441 2 : if m.LargestSeqNum < seqNum {
442 2 : m.LargestSeqNum = seqNum
443 2 : }
444 : }
|