Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/keyspan"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : "github.com/cockroachdb/pebble/sstable/block"
15 : )
16 :
17 : // NewRawWriter returns a new table writer for the file. Closing the writer will
18 : // close the file.
19 1 : func NewRawWriter(writable objstorage.Writable, o WriterOptions) RawWriter {
20 1 : if o.TableFormat <= TableFormatPebblev4 {
21 1 : return newRowWriter(writable, o)
22 1 : }
23 1 : return newColumnarWriter(writable, o)
24 : }
25 :
26 : // Writer is a table writer.
27 : type Writer struct {
28 : rw RawWriter
29 : err error
30 : // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
31 : // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
32 : // and values, emitting fragmented, coalesced spans as appropriate. Range
33 : // keys must be added in order of their start user-key.
34 : fragmenter keyspan.Fragmenter
35 : comparer *base.Comparer
36 : // isStrictObsolete is true if the writer is configured to write and enforce
37 : // a 'strict obsolete' sstable. This includes prohibiting the addition of
38 : // MERGE keys. See the documentation in format.go for more details.
39 : isStrictObsolete bool
40 : rkBuf []byte
41 : keyspanKeys []keyspan.Key
42 : }
43 :
44 : // NewWriter returns a new table writer intended for building external sstables
45 : // (eg, for ingestion or storage outside the LSM) for the file. Closing the
46 : // writer will close the file.
47 : //
48 : // Internal clients should generally prefer NewRawWriter.
49 1 : func NewWriter(writable objstorage.Writable, o WriterOptions) *Writer {
50 1 : o = o.ensureDefaults()
51 1 : rw := NewRawWriter(writable, o)
52 1 : w := &Writer{}
53 1 : *w = Writer{
54 1 : rw: rw,
55 1 : fragmenter: keyspan.Fragmenter{
56 1 : Cmp: o.Comparer.Compare,
57 1 : Format: o.Comparer.FormatKey,
58 1 : Emit: w.encodeFragmentedRangeKeySpan,
59 1 : },
60 1 : comparer: o.Comparer,
61 1 : isStrictObsolete: o.IsStrictObsolete,
62 1 : }
63 1 : return w
64 1 : }
65 :
66 0 : func (w *Writer) encodeFragmentedRangeKeySpan(s keyspan.Span) {
67 0 : // This method is the emit function of the Fragmenter.
68 0 : //
69 0 : // NB: The span should only contain range keys and be internally consistent
70 0 : // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
71 0 : //
72 0 : // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
73 0 : // we may want to in the future.
74 0 : keyspan.SortKeysBySuffix(w.comparer.CompareRangeSuffixes, s.Keys)
75 0 :
76 0 : if w.Error() == nil {
77 0 : w.rw.EncodeSpan(s)
78 0 : }
79 : }
80 :
81 : // Error returns the current accumulated error if any.
82 1 : func (w *Writer) Error() error {
83 1 : return errors.CombineErrors(w.rw.Error(), w.err)
84 1 : }
85 :
86 : // Raw returns the underlying RawWriter.
87 1 : func (w *Writer) Raw() RawWriter { return w.rw }
88 :
89 : // Set sets the value for the given key. The sequence number is set to 0.
90 : // Intended for use to externally construct an sstable before ingestion into a
91 : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
92 : // order.
93 : //
94 : // TODO(peter): untested
95 1 : func (w *Writer) Set(key, value []byte) error {
96 1 : if err := w.Error(); err != nil {
97 0 : return err
98 0 : }
99 1 : if w.isStrictObsolete {
100 0 : return errors.Errorf("use AddWithForceObsolete")
101 0 : }
102 : // forceObsolete is false based on the assumption that no RANGEDELs in the
103 : // sstable delete the added points.
104 1 : return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
105 : }
106 :
107 : // Delete deletes the value for the given key. The sequence number is set to
108 : // 0. Intended for use to externally construct an sstable before ingestion into
109 : // a DB.
110 : //
111 : // TODO(peter): untested
112 0 : func (w *Writer) Delete(key []byte) error {
113 0 : if err := w.Error(); err != nil {
114 0 : return err
115 0 : }
116 0 : if w.isStrictObsolete {
117 0 : return errors.Errorf("use AddWithForceObsolete")
118 0 : }
119 : // forceObsolete is false based on the assumption that no RANGEDELs in the
120 : // sstable delete the added points.
121 0 : return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
122 : }
123 :
124 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
125 : // (inclusive on start, exclusive on end). The sequence number is set to
126 : // 0. Intended for use to externally construct an sstable before ingestion into
127 : // a DB.
128 : //
129 : // Calls to DeleteRange must be made using already-fragmented (non-overlapping)
130 : // spans and in sorted order.
131 : //
132 : // TODO(peter): untested
133 1 : func (w *Writer) DeleteRange(start, end []byte) error {
134 1 : if err := w.Error(); err != nil {
135 0 : return err
136 0 : }
137 1 : return w.rw.EncodeSpan(keyspan.Span{
138 1 : Start: start,
139 1 : End: end,
140 1 : Keys: append(w.keyspanKeys[:0], keyspan.Key{
141 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete),
142 1 : }),
143 1 : })
144 : }
145 :
146 : // Merge adds an action to the DB that merges the value at key with the new
147 : // value. The details of the merge are dependent upon the configured merge
148 : // operator. The sequence number is set to 0. Intended for use to externally
149 : // construct an sstable before ingestion into a DB.
150 : //
151 : // TODO(peter): untested
152 0 : func (w *Writer) Merge(key, value []byte) error {
153 0 : if err := w.Error(); err != nil {
154 0 : return err
155 0 : }
156 0 : if w.isStrictObsolete {
157 0 : return errors.Errorf("use AddWithForceObsolete")
158 0 : }
159 : // forceObsolete is false based on the assumption that no RANGEDELs in the
160 : // sstable that delete the added points. If the user configured this writer
161 : // to be strict-obsolete, addPoint will reject the addition of this MERGE.
162 0 : return w.rw.AddWithForceObsolete(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
163 : }
164 :
165 : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
166 : // the given suffix to the given value. The resulting range key is given the
167 : // sequence number zero, with the expectation that the resulting sstable will be
168 : // ingested.
169 : //
170 : // Keys must be added to the table in increasing order of start key. Spans are
171 : // not required to be fragmented. The same suffix may not be set or unset twice
172 : // over the same keyspan, because it would result in inconsistent state. Both
173 : // the Set and Unset would share the zero sequence number, and a key cannot be
174 : // both simultaneously set and unset.
175 0 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
176 0 : return w.addRangeKeySpanToFragmenter(keyspan.Span{
177 0 : Start: w.tempRangeKeyCopy(start),
178 0 : End: w.tempRangeKeyCopy(end),
179 0 : Keys: []keyspan.Key{
180 0 : {
181 0 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
182 0 : Suffix: w.tempRangeKeyCopy(suffix),
183 0 : Value: w.tempRangeKeyCopy(value),
184 0 : },
185 0 : },
186 0 : })
187 0 : }
188 :
189 : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
190 : // with the given suffix. The resulting range key is given the
191 : // sequence number zero, with the expectation that the resulting sstable will be
192 : // ingested.
193 : //
194 : // Keys must be added to the table in increasing order of start key. Spans are
195 : // not required to be fragmented. The same suffix may not be set or unset twice
196 : // over the same keyspan, because it would result in inconsistent state. Both
197 : // the Set and Unset would share the zero sequence number, and a key cannot be
198 : // both simultaneously set and unset.
199 0 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
200 0 : return w.addRangeKeySpanToFragmenter(keyspan.Span{
201 0 : Start: w.tempRangeKeyCopy(start),
202 0 : End: w.tempRangeKeyCopy(end),
203 0 : Keys: []keyspan.Key{
204 0 : {
205 0 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
206 0 : Suffix: w.tempRangeKeyCopy(suffix),
207 0 : },
208 0 : },
209 0 : })
210 0 : }
211 :
212 : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
213 : //
214 : // Keys must be added to the table in increasing order of start key. Spans are
215 : // not required to be fragmented.
216 0 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
217 0 : return w.addRangeKeySpanToFragmenter(keyspan.Span{
218 0 : Start: w.tempRangeKeyCopy(start),
219 0 : End: w.tempRangeKeyCopy(end),
220 0 : Keys: []keyspan.Key{
221 0 : {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
222 0 : },
223 0 : })
224 0 : }
225 :
226 0 : func (w *Writer) addRangeKeySpanToFragmenter(span keyspan.Span) error {
227 0 : if w.comparer.Compare(span.Start, span.End) >= 0 {
228 0 : return errors.Errorf(
229 0 : "pebble: start key must be strictly less than end key",
230 0 : )
231 0 : }
232 0 : if w.fragmenter.Start() != nil && w.comparer.Compare(w.fragmenter.Start(), span.Start) > 0 {
233 0 : return errors.Errorf("pebble: spans must be added in order: %s > %s",
234 0 : w.comparer.FormatKey(w.fragmenter.Start()), w.comparer.FormatKey(span.Start))
235 0 : }
236 : // Add this span to the fragmenter.
237 0 : w.fragmenter.Add(span)
238 0 : return w.Error()
239 : }
240 :
241 : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
242 : // slice. Any byte written to the returned slice is retained for the lifetime of
243 : // the Writer.
244 0 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
245 0 : if cap(w.rkBuf)-len(w.rkBuf) < n {
246 0 : size := len(w.rkBuf) + 2*n
247 0 : if size < 2*cap(w.rkBuf) {
248 0 : size = 2 * cap(w.rkBuf)
249 0 : }
250 0 : buf := make([]byte, len(w.rkBuf), size)
251 0 : copy(buf, w.rkBuf)
252 0 : w.rkBuf = buf
253 : }
254 0 : b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
255 0 : w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
256 0 : return b
257 : }
258 :
259 : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
260 : // range key buffer.
261 0 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
262 0 : if len(k) == 0 {
263 0 : return nil
264 0 : }
265 0 : buf := w.tempRangeKeyBuf(len(k))
266 0 : copy(buf, k)
267 0 : return buf
268 : }
269 :
270 : // Metadata returns the metadata for the finished sstable. Only valid to call
271 : // after the sstable has been finished.
272 0 : func (w *Writer) Metadata() (*WriterMetadata, error) {
273 0 : return w.rw.Metadata()
274 0 : }
275 :
276 : // Close finishes writing the table and closes the underlying file that the
277 : // table was written to.
278 1 : func (w *Writer) Close() (err error) {
279 1 : if w.Error() == nil {
280 1 : // Write the range-key block, flushing any remaining spans from the
281 1 : // fragmenter first.
282 1 : w.fragmenter.Finish()
283 1 : }
284 1 : return errors.CombineErrors(w.rw.Close(), w.err)
285 : }
286 :
287 : // RawWriter defines an interface for sstable writers. Implementations may vary
288 : // depending on the TableFormat being written.
289 : type RawWriter interface {
290 : // Error returns the current accumulated error if any.
291 : Error() error
292 : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
293 : //
294 : // forceObsolete indicates whether the caller has determined that this key is
295 : // obsolete even though it may be the latest point key for this userkey. This
296 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
297 : // strict-obsolete sstables. It's optional for non-strict-obsolete sstables.
298 : //
299 : // Note that there are two properties, S1 and S2 (see comment in format.go)
300 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
301 : // responsibility of the caller. S1 is solely the responsibility of the
302 : // callee.
303 : AddWithForceObsolete(
304 : key InternalKey, value []byte, forceObsolete bool,
305 : ) error
306 : // EncodeSpan encodes the keys in the given span. The span can contain
307 : // either only RANGEDEL keys or only range keys.
308 : //
309 : // This is a low-level API that bypasses the fragmenter. The spans passed to
310 : // this function must be fragmented and ordered.
311 : EncodeSpan(span keyspan.Span) error
312 : // EstimatedSize returns the estimated size of the sstable being written if
313 : // a call to Close() was made without adding additional keys.
314 : EstimatedSize() uint64
315 : // ComparePrev compares the provided user to the last point key written to the
316 : // writer. The returned value is equivalent to Compare(key, prevKey) where
317 : // prevKey is the last point key written to the writer.
318 : //
319 : // If no key has been written yet, ComparePrev returns +1.
320 : //
321 : // Must not be called after Writer is closed.
322 : ComparePrev(k []byte) int
323 : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
324 : // be used internally by Pebble.
325 : SetSnapshotPinnedProperties(keyCount, keySize, valueSize uint64)
326 : // Close finishes writing the table and closes the underlying file that the
327 : // table was written to.
328 : Close() error
329 : // Metadata returns the metadata for the finished sstable. Only valid to
330 : // call after the sstable has been finished.
331 : Metadata() (*WriterMetadata, error)
332 :
333 : // rewriteSuffixes rewrites the table's data blocks to all contain the
334 : // provided suffix. It's specifically used for the implementation of
335 : // RewriteKeySuffixesAndReturnFormat. See that function's documentation for
336 : // more details.
337 : rewriteSuffixes(r *Reader, wo WriterOptions, from, to []byte, concurrency int) error
338 :
339 : // copyDataBlocks copies data blocks to the table from the specified ReadHandle.
340 : // It's specifically used by the sstable copier that can copy parts of an sstable
341 : // to a new sstable, using CopySpan().
342 : copyDataBlocks(ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle) error
343 :
344 : // addDataBlock adds a raw data block to the table as-is. It's specifically used
345 : // by the sstable copier that can copy parts of an sstable to a new sstable,
346 : // using CopySpan().
347 : addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error
348 :
349 : // copyFilter copies the specified filter to the table. It's specifically used
350 : // by the sstable copier that can copy parts of an sstable to a new sstable,
351 : // using CopySpan().
352 : copyFilter(filter []byte, filterName string) error
353 :
354 : // copyProperties copies properties from the specified props, and resets others
355 : // to prepare for copying data blocks from another sstable. It's specifically
356 : // used by the sstable copier that can copy parts of an sstable to a new sstable,
357 : // using CopySpan().
358 : copyProperties(props Properties)
359 : }
360 :
361 : // WriterMetadata holds info about a finished sstable.
362 : type WriterMetadata struct {
363 : Size uint64
364 : SmallestPoint InternalKey
365 : // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
366 : // before Writer.Close is called, because they may only be set on
367 : // Writer.Close.
368 : LargestPoint InternalKey
369 : SmallestRangeDel InternalKey
370 : LargestRangeDel InternalKey
371 : SmallestRangeKey InternalKey
372 : LargestRangeKey InternalKey
373 : HasPointKeys bool
374 : HasRangeDelKeys bool
375 : HasRangeKeys bool
376 : SmallestSeqNum base.SeqNum
377 : LargestSeqNum base.SeqNum
378 : Properties Properties
379 : }
380 :
381 : // SetSmallestPointKey sets the smallest point key to the given key.
382 : // NB: this method set the "absolute" smallest point key. Any existing key is
383 : // overridden.
384 1 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
385 1 : m.SmallestPoint = k
386 1 : m.HasPointKeys = true
387 1 : }
388 :
389 : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
390 : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
391 : // overridden.
392 1 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
393 1 : m.SmallestRangeDel = k
394 1 : m.HasRangeDelKeys = true
395 1 : }
396 :
397 : // SetSmallestRangeKey sets the smallest range key to the given key.
398 : // NB: this method set the "absolute" smallest range key. Any existing key is
399 : // overridden.
400 1 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
401 1 : m.SmallestRangeKey = k
402 1 : m.HasRangeKeys = true
403 1 : }
404 :
405 : // SetLargestPointKey sets the largest point key to the given key.
406 : // NB: this method set the "absolute" largest point key. Any existing key is
407 : // overridden.
408 1 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
409 1 : m.LargestPoint = k
410 1 : m.HasPointKeys = true
411 1 : }
412 :
413 : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
414 : // NB: this method set the "absolute" largest rangedel key. Any existing key is
415 : // overridden.
416 1 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
417 1 : m.LargestRangeDel = k
418 1 : m.HasRangeDelKeys = true
419 1 : }
420 :
421 : // SetLargestRangeKey sets the largest range key to the given key.
422 : // NB: this method set the "absolute" largest range key. Any existing key is
423 : // overridden.
424 1 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
425 1 : m.LargestRangeKey = k
426 1 : m.HasRangeKeys = true
427 1 : }
428 :
429 1 : func (m *WriterMetadata) updateSeqNum(seqNum base.SeqNum) {
430 1 : if m.SmallestSeqNum > seqNum {
431 1 : m.SmallestSeqNum = seqNum
432 1 : }
433 1 : if m.LargestSeqNum < seqNum {
434 1 : m.LargestSeqNum = seqNum
435 1 : }
436 : }
|