Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "math"
13 : "slices"
14 : "sync"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/bytealloc"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/internal/keyspan"
21 : "github.com/cockroachdb/pebble/objstorage"
22 : "github.com/cockroachdb/pebble/sstable/block"
23 : "github.com/cockroachdb/pebble/sstable/colblk"
24 : "github.com/cockroachdb/pebble/sstable/rowblk"
25 : "github.com/cockroachdb/pebble/sstable/valblk"
26 : )
27 :
28 : // RawColumnWriter is a sstable RawWriter that writes sstables with
29 : // column-oriented blocks. All table formats TableFormatPebblev5 and later write
30 : // column-oriented blocks and use RawColumnWriter.
31 : type RawColumnWriter struct {
32 : comparer *base.Comparer
33 : meta WriterMetadata
34 : opts WriterOptions
35 : err error
36 :
37 : dataFlush block.FlushGovernor
38 : indexFlush block.FlushGovernor
39 : blockPropCollectors []BlockPropertyCollector
40 : blockPropsEncoder blockPropertiesEncoder
41 : obsoleteCollector obsoleteKeyBlockPropertyCollector
42 : props Properties
43 : // block writers buffering unflushed data.
44 : dataBlock struct {
45 : colblk.DataBlockEncoder
46 : // numDeletions stores the count of point tombstones in this data block.
47 : // It's used to determine if this data block is considered
48 : // tombstone-dense for the purposes of compaction.
49 : numDeletions int
50 : // deletionSize stores the raw size of point tombstones in this data
51 : // block. It's used to determine if this data block is considered
52 : // tombstone-dense for the purposes of compaction.
53 : deletionSize int
54 : }
55 : indexBlock colblk.IndexBlockWriter
56 : topLevelIndexBlock colblk.IndexBlockWriter
57 : rangeDelBlock colblk.KeyspanBlockWriter
58 : rangeKeyBlock colblk.KeyspanBlockWriter
59 : valueBlock *valblk.Writer // nil iff WriterOptions.DisableValueBlocks=true
60 : // filter accumulates the filter block. If populated, the filter ingests
61 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
62 : // nil, or the full keys otherwise.
63 : filterBlock filterWriter
64 : prevPointKey struct {
65 : trailer base.InternalKeyTrailer
66 : isObsolete bool
67 : }
68 : pendingDataBlockSize int
69 : indexBlockSize int
70 : queuedDataSize uint64
71 :
72 : // indexBuffering holds finished index blocks as they're completed while
73 : // building the sstable. If an index block grows sufficiently large
74 : // (IndexBlockSize) while an sstable is still being constructed, the sstable
75 : // writer will create a two-level index structure. As index blocks are
76 : // completed, they're finished and buffered in-memory until the table is
77 : // finished. When the table is finished, the buffered index blocks are
78 : // flushed in order after all the data blocks, and the top-level index block
79 : // is constructed to point to all the individual index blocks.
80 : indexBuffering struct {
81 : // partitions holds all the completed index blocks.
82 : partitions []bufferedIndexBlock
83 : // blockAlloc is used to bulk-allocate byte slices used to store index
84 : // blocks in partitions. These live until the sstable is finished.
85 : blockAlloc []byte
86 : // sepAlloc is used to bulk-allocate index block separator slices stored
87 : // in partitions. These live until the sstable is finished.
88 : sepAlloc bytealloc.A
89 : }
90 :
91 : writeQueue struct {
92 : wg sync.WaitGroup
93 : ch chan *compressedBlock
94 : err error
95 : }
96 : layout layoutWriter
97 :
98 : lastKeyBuf []byte
99 : separatorBuf []byte
100 : tmp [blockHandleLikelyMaxLen]byte
101 : previousUserKey invariants.Value[[]byte]
102 : disableKeyOrderChecks bool
103 : }
104 :
105 : // Assert that *RawColumnWriter implements RawWriter.
106 : var _ RawWriter = (*RawColumnWriter)(nil)
107 :
108 2 : func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
109 2 : if writable == nil {
110 0 : panic("pebble: nil writable")
111 : }
112 2 : if !o.TableFormat.BlockColumnar() {
113 0 : panic(errors.AssertionFailedf("newColumnarWriter cannot create sstables with %s format", o.TableFormat))
114 : }
115 2 : o = o.ensureDefaults()
116 2 : w := &RawColumnWriter{
117 2 : comparer: o.Comparer,
118 2 : meta: WriterMetadata{
119 2 : SmallestSeqNum: math.MaxUint64,
120 2 : },
121 2 : opts: o,
122 2 : layout: makeLayoutWriter(writable, o),
123 2 : disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
124 2 : }
125 2 : w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
126 2 : w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
127 2 : w.dataBlock.Init(o.KeySchema)
128 2 : w.indexBlock.Init()
129 2 : w.topLevelIndexBlock.Init()
130 2 : w.rangeDelBlock.Init(w.comparer.Equal)
131 2 : w.rangeKeyBlock.Init(w.comparer.Equal)
132 2 : if !o.DisableValueBlocks {
133 2 : w.valueBlock = valblk.NewWriter(
134 2 : block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
135 2 : w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
136 : }
137 2 : if o.FilterPolicy != nil {
138 2 : switch o.FilterType {
139 2 : case TableFilter:
140 2 : w.filterBlock = newTableFilterWriter(o.FilterPolicy)
141 0 : default:
142 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
143 : }
144 : }
145 :
146 2 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
147 2 : if !o.disableObsoleteCollector {
148 2 : numBlockPropertyCollectors++
149 2 : }
150 2 : if numBlockPropertyCollectors > maxPropertyCollectors {
151 0 : panic(errors.New("pebble: too many block property collectors"))
152 : }
153 2 : w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
154 2 : for _, constructFn := range o.BlockPropertyCollectors {
155 2 : w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
156 2 : }
157 2 : if !o.disableObsoleteCollector {
158 2 : w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
159 2 : }
160 2 : var buf bytes.Buffer
161 2 : buf.WriteString("[")
162 2 : for i := range w.blockPropCollectors {
163 2 : if i > 0 {
164 2 : buf.WriteString(",")
165 2 : }
166 2 : buf.WriteString(w.blockPropCollectors[i].Name())
167 : }
168 2 : buf.WriteString("]")
169 2 : w.props.PropertyCollectorNames = buf.String()
170 2 :
171 2 : w.props.ComparerName = o.Comparer.Name
172 2 : w.props.CompressionName = o.Compression.String()
173 2 : w.props.KeySchemaName = o.KeySchema.Name
174 2 : w.props.MergerName = o.MergerName
175 2 :
176 2 : w.writeQueue.ch = make(chan *compressedBlock)
177 2 : w.writeQueue.wg.Add(1)
178 2 : go w.drainWriteQueue()
179 2 : return w
180 : }
181 :
182 : // Error returns the current accumulated error if any.
183 2 : func (w *RawColumnWriter) Error() error {
184 2 : return w.err
185 2 : }
186 :
187 : // EstimatedSize returns the estimated size of the sstable being written if
188 : // a call to Close() was made without adding additional keys.
189 2 : func (w *RawColumnWriter) EstimatedSize() uint64 {
190 2 : sz := rocksDBFooterLen + w.queuedDataSize
191 2 : // TODO(jackson): Avoid iterating over partitions by incrementally
192 2 : // maintaining the size contribution of all buffered partitions.
193 2 : for _, bib := range w.indexBuffering.partitions {
194 2 : // We include the separator user key to account for its bytes in the
195 2 : // top-level index block.
196 2 : //
197 2 : // TODO(jackson): We could incrementally build the top-level index block
198 2 : // and produce an exact calculation of the current top-level index
199 2 : // block's size.
200 2 : sz += uint64(len(bib.block) + block.TrailerLen + len(bib.sep.UserKey))
201 2 : }
202 2 : if w.rangeDelBlock.KeyCount() > 0 {
203 2 : sz += uint64(w.rangeDelBlock.Size())
204 2 : }
205 2 : if w.rangeKeyBlock.KeyCount() > 0 {
206 2 : sz += uint64(w.rangeKeyBlock.Size())
207 2 : }
208 2 : if w.valueBlock != nil {
209 2 : sz += w.valueBlock.Size()
210 2 : }
211 : // TODO(jackson): Include an estimate of the properties, filter and meta
212 : // index blocks sizes.
213 2 : return sz
214 : }
215 :
216 : // ComparePrev compares the provided user to the last point key written to the
217 : // writer. The returned value is equivalent to Compare(key, prevKey) where
218 : // prevKey is the last point key written to the writer.
219 : //
220 : // If no key has been written yet, ComparePrev returns +1.
221 : //
222 : // Must not be called after Writer is closed.
223 2 : func (w *RawColumnWriter) ComparePrev(k []byte) int {
224 2 : if w == nil || w.dataBlock.Rows() == 0 {
225 2 : return +1
226 2 : }
227 2 : return int(w.dataBlock.KeyWriter.ComparePrev(k).UserKeyComparison)
228 : }
229 :
230 : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
231 : // be used internally by Pebble.
232 : func (w *RawColumnWriter) SetSnapshotPinnedProperties(
233 : pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
234 2 : ) {
235 2 : w.props.SnapshotPinnedKeys = pinnedKeyCount
236 2 : w.props.SnapshotPinnedKeySize = pinnedKeySize
237 2 : w.props.SnapshotPinnedValueSize = pinnedValueSize
238 2 : }
239 :
240 : // Metadata returns the metadata for the finished sstable. Only valid to call
241 : // after the sstable has been finished.
242 2 : func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
243 2 : if !w.layout.IsFinished() {
244 0 : return nil, errors.New("pebble: writer is not closed")
245 0 : }
246 2 : return &w.meta, nil
247 : }
248 :
249 : // EncodeSpan encodes the keys in the given span. The span can contain either
250 : // only RANGEDEL keys or only range keys.
251 2 : func (w *RawColumnWriter) EncodeSpan(span keyspan.Span) error {
252 2 : if span.Empty() {
253 2 : return nil
254 2 : }
255 2 : for _, k := range span.Keys {
256 2 : w.meta.updateSeqNum(k.SeqNum())
257 2 : }
258 :
259 2 : blockWriter := &w.rangeKeyBlock
260 2 : if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
261 2 : blockWriter = &w.rangeDelBlock
262 2 : // Update range delete properties.
263 2 : // NB: These properties are computed differently than the rowblk sstable
264 2 : // writer because this writer does not flatten them into row key-value
265 2 : // pairs.
266 2 : w.props.RawKeySize += uint64(len(span.Start) + len(span.End))
267 2 : count := uint64(len(span.Keys))
268 2 : w.props.NumEntries += count
269 2 : w.props.NumDeletions += count
270 2 : w.props.NumRangeDeletions += count
271 2 : } else {
272 2 : // Update range key properties.
273 2 : // NB: These properties are computed differently than the rowblk sstable
274 2 : // writer because this writer does not flatten them into row key-value
275 2 : // pairs.
276 2 : w.props.RawRangeKeyKeySize += uint64(len(span.Start) + len(span.End))
277 2 : for _, k := range span.Keys {
278 2 : w.props.RawRangeKeyValueSize += uint64(len(k.Value))
279 2 : switch k.Kind() {
280 2 : case base.InternalKeyKindRangeKeyDelete:
281 2 : w.props.NumRangeKeyDels++
282 2 : case base.InternalKeyKindRangeKeySet:
283 2 : w.props.NumRangeKeySets++
284 2 : case base.InternalKeyKindRangeKeyUnset:
285 2 : w.props.NumRangeKeyUnsets++
286 0 : default:
287 0 : panic(errors.Errorf("pebble: invalid range key type: %s", k.Kind()))
288 : }
289 : }
290 2 : for i := range w.blockPropCollectors {
291 2 : if err := w.blockPropCollectors[i].AddRangeKeys(span); err != nil {
292 0 : return err
293 0 : }
294 : }
295 : }
296 2 : if !w.disableKeyOrderChecks && blockWriter.KeyCount() > 0 {
297 2 : // Check that spans are being added in fragmented order. If the two
298 2 : // tombstones overlap, their start and end keys must be identical.
299 2 : prevStart, prevEnd, prevTrailer := blockWriter.UnsafeLastSpan()
300 2 : if w.opts.Comparer.Equal(prevStart, span.Start) && w.opts.Comparer.Equal(prevEnd, span.End) {
301 1 : if prevTrailer < span.Keys[0].Trailer {
302 1 : w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
303 1 : w.opts.Comparer.FormatKey(prevStart),
304 1 : w.opts.Comparer.FormatKey(prevEnd),
305 1 : prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
306 1 : }
307 2 : } else if c := w.opts.Comparer.Compare(prevEnd, span.Start); c > 0 {
308 1 : w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
309 1 : w.opts.Comparer.FormatKey(prevStart),
310 1 : w.opts.Comparer.FormatKey(prevEnd),
311 1 : prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
312 1 : return w.err
313 1 : }
314 : }
315 2 : blockWriter.AddSpan(span)
316 2 : return nil
317 : }
318 :
319 : // AddWithForceObsolete adds a point key/value pair when writing a
320 : // strict-obsolete sstable. For a given Writer, the keys passed to Add must be
321 : // in increasing order. Span keys (range deletions, range keys) must be added
322 : // through EncodeSpan.
323 : //
324 : // forceObsolete indicates whether the caller has determined that this key is
325 : // obsolete even though it may be the latest point key for this userkey. This
326 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
327 : // strict-obsolete sstables.
328 : //
329 : // Note that there are two properties, S1 and S2 (see comment in format.go)
330 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
331 : // responsibility of the caller. S1 is solely the responsibility of the
332 : // callee.
333 : func (w *RawColumnWriter) AddWithForceObsolete(
334 : key InternalKey, value []byte, forceObsolete bool,
335 2 : ) error {
336 2 : switch key.Kind() {
337 : case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
338 1 : base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
339 1 : return errors.Newf("%s must be added through EncodeSpan", key.Kind())
340 2 : case base.InternalKeyKindMerge:
341 2 : if w.opts.IsStrictObsolete {
342 0 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
343 0 : }
344 : }
345 :
346 2 : eval, err := w.evaluatePoint(key, len(value))
347 2 : if err != nil {
348 1 : return err
349 1 : }
350 2 : eval.isObsolete = eval.isObsolete || forceObsolete
351 2 : w.prevPointKey.trailer = key.Trailer
352 2 : w.prevPointKey.isObsolete = eval.isObsolete
353 2 :
354 2 : var valuePrefix block.ValuePrefix
355 2 : var valueStoredWithKey []byte
356 2 : if eval.writeToValueBlock {
357 2 : vh, err := w.valueBlock.AddValue(value)
358 2 : if err != nil {
359 0 : return err
360 0 : }
361 2 : n := valblk.EncodeHandle(w.tmp[:], vh)
362 2 : valueStoredWithKey = w.tmp[:n]
363 2 : var attribute base.ShortAttribute
364 2 : if w.opts.ShortAttributeExtractor != nil {
365 1 : // TODO(sumeer): for compactions, it is possible that the input sstable
366 1 : // already has this value in the value section and so we have already
367 1 : // extracted the ShortAttribute. Avoid extracting it again. This will
368 1 : // require changing the RawWriter.Add interface.
369 1 : if attribute, err = w.opts.ShortAttributeExtractor(
370 1 : key.UserKey, int(eval.kcmp.PrefixLen), value); err != nil {
371 0 : return err
372 0 : }
373 : }
374 2 : valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
375 2 : } else {
376 2 : valueStoredWithKey = value
377 2 : if len(value) > 0 {
378 2 : valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
379 2 : }
380 : }
381 :
382 : // Append the key to the data block. We have NOT yet committed to
383 : // including the key in the block. The data block writer permits us to
384 : // finish the block excluding the last-appended KV.
385 2 : entriesWithoutKV := w.dataBlock.Rows()
386 2 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
387 2 :
388 2 : // Now that we've appended the KV pair, we can compute the exact size of the
389 2 : // block with this key-value pair included. Check to see if we should flush
390 2 : // the current block, either with or without the added key-value pair.
391 2 : size := w.dataBlock.Size()
392 2 : if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize, entriesWithoutKV, &w.dataFlush) {
393 2 : // Flush the data block excluding the key we just added.
394 2 : if err := w.flushDataBlockWithoutNextKey(key.UserKey); err != nil {
395 0 : w.err = err
396 0 : return err
397 0 : }
398 : // flushDataBlockWithoutNextKey reset the data block builder, and we can
399 : // add the key to this next block now.
400 2 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
401 2 : w.pendingDataBlockSize = w.dataBlock.Size()
402 2 : } else {
403 2 : // We're not flushing the data block, and we're committing to including
404 2 : // the current KV in the block. Remember the new size of the data block
405 2 : // with the current KV.
406 2 : w.pendingDataBlockSize = size
407 2 : }
408 :
409 2 : for i := range w.blockPropCollectors {
410 2 : v := value
411 2 : if key.Kind() == base.InternalKeyKindSet {
412 2 : // Values for SET are not required to be in-place, and in the future
413 2 : // may not even be read by the compaction, so pass nil values. Block
414 2 : // property collectors in such Pebble DB's must not look at the
415 2 : // value.
416 2 : v = nil
417 2 : }
418 2 : if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
419 0 : w.err = err
420 0 : return err
421 0 : }
422 : }
423 2 : w.obsoleteCollector.AddPoint(eval.isObsolete)
424 2 : if w.filterBlock != nil {
425 2 : w.filterBlock.addKey(key.UserKey[:eval.kcmp.PrefixLen])
426 2 : }
427 2 : w.meta.updateSeqNum(key.SeqNum())
428 2 : if !w.meta.HasPointKeys {
429 2 : w.meta.SetSmallestPointKey(key.Clone())
430 2 : }
431 :
432 2 : w.props.NumEntries++
433 2 : switch key.Kind() {
434 2 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
435 2 : w.props.NumDeletions++
436 2 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
437 2 : w.dataBlock.numDeletions++
438 2 : w.dataBlock.deletionSize += len(key.UserKey)
439 2 : case InternalKeyKindDeleteSized:
440 2 : var size uint64
441 2 : if len(value) > 0 {
442 2 : var n int
443 2 : size, n = binary.Uvarint(value)
444 2 : if n <= 0 {
445 0 : return errors.Newf("%s key's value (%x) does not parse as uvarint",
446 0 : errors.Safe(key.Kind().String()), value)
447 0 : }
448 : }
449 2 : w.props.NumDeletions++
450 2 : w.props.NumSizedDeletions++
451 2 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
452 2 : w.props.RawPointTombstoneValueSize += size
453 2 : w.dataBlock.numDeletions++
454 2 : w.dataBlock.deletionSize += len(key.UserKey)
455 2 : case InternalKeyKindMerge:
456 2 : w.props.NumMergeOperands++
457 : }
458 2 : w.props.RawKeySize += uint64(key.Size())
459 2 : w.props.RawValueSize += uint64(len(value))
460 2 : return nil
461 : }
462 :
463 : type pointKeyEvaluation struct {
464 : kcmp colblk.KeyComparison
465 : isObsolete bool
466 : writeToValueBlock bool
467 : }
468 :
469 : // evaluatePoint takes information about a point key being written to the
470 : // sstable and decides how the point should be represented, where its value
471 : // should be stored, etc.
472 : func (w *RawColumnWriter) evaluatePoint(
473 : key base.InternalKey, valueLen int,
474 2 : ) (eval pointKeyEvaluation, err error) {
475 2 : eval.kcmp = w.dataBlock.KeyWriter.ComparePrev(key.UserKey)
476 2 :
477 2 : // When invariants are enabled, validate kcmp.
478 2 : if invariants.Enabled {
479 2 : colblk.AssertKeyCompare(w.comparer, key.UserKey, w.previousUserKey.Get(), eval.kcmp)
480 2 : w.previousUserKey.Set(append(w.previousUserKey.Get()[:0], key.UserKey...))
481 2 : }
482 :
483 2 : if !w.meta.HasPointKeys {
484 2 : return eval, nil
485 2 : }
486 2 : keyKind := key.Kind()
487 2 : // Ensure that no one adds a point key kind without considering the obsolete
488 2 : // handling for that kind.
489 2 : switch keyKind {
490 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
491 2 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
492 0 : default:
493 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
494 : }
495 2 : prevKeyKind := w.prevPointKey.trailer.Kind()
496 2 : // If same user key, then the current key is obsolete if any of the
497 2 : // following is true:
498 2 : // C1 The prev key was obsolete.
499 2 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
500 2 : // preserve SET* and MERGE since their values will be merged into the
501 2 : // previous key. We also must preserve DEL* since there may be an older
502 2 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
503 2 : // if we omit the DEL* that lower SET*/MERGE will become visible.
504 2 : //
505 2 : // Regardless of whether it is the same user key or not
506 2 : // C3 The current key is some kind of point delete, and we are writing to
507 2 : // the lowest level, then it is also obsolete. The correctness of this
508 2 : // relies on the same user key not spanning multiple sstables in a level.
509 2 : //
510 2 : // C1 ensures that for a user key there is at most one transition from
511 2 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
512 2 : // are not obsolete. We consider the various value of n:
513 2 : //
514 2 : // n = 0: This happens due to forceObsolete being set by the caller, or due
515 2 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
516 2 : // must also delete all the lower seqnums for the same user key. C3 triggers
517 2 : // due to a point delete and that deletes all the lower seqnums for the same
518 2 : // user key.
519 2 : //
520 2 : // n = 1: This is the common case. It happens when the first key is not a
521 2 : // MERGE, or the current key is some kind of point delete.
522 2 : //
523 2 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
524 2 : // single non-MERGE key.
525 2 : isObsoleteC1AndC2 := eval.kcmp.UserKeyComparison == 0 &&
526 2 : (w.prevPointKey.isObsolete || prevKeyKind != InternalKeyKindMerge)
527 2 : isObsoleteC3 := w.opts.WritingToLowestLevel &&
528 2 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
529 2 : keyKind == InternalKeyKindDeleteSized)
530 2 : eval.isObsolete = isObsoleteC1AndC2 || isObsoleteC3
531 2 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
532 2 : // possible, but requires some care in documenting and checking invariants.
533 2 : // There is code that assumes nothing in value blocks because of single MVCC
534 2 : // version (those should be ok). We have to ensure setHasSamePrefix is
535 2 : // correctly initialized here etc.
536 2 :
537 2 : if !w.disableKeyOrderChecks && (eval.kcmp.UserKeyComparison < 0 ||
538 2 : (eval.kcmp.UserKeyComparison == 0 && w.prevPointKey.trailer <= key.Trailer)) {
539 1 : previousKey := base.InternalKey{
540 1 : UserKey: w.dataBlock.MaterializeLastUserKey(nil),
541 1 : Trailer: w.prevPointKey.trailer,
542 1 : }
543 1 : return eval, errors.Errorf(
544 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
545 1 : previousKey.Pretty(w.comparer.FormatKey),
546 1 : key.Pretty(w.comparer.FormatKey))
547 1 : }
548 :
549 : // We might want to write this key's value to a value block if it has the
550 : // same prefix.
551 : //
552 : // We require:
553 : // . Value blocks to be enabled.
554 : // . The current key to have the same prefix as the previous key.
555 : // . The previous key to be a SET.
556 : // . The current key to be a SET.
557 : // . If there are bounds requiring some keys' values to be in-place, the
558 : // key must not fall within those bounds.
559 : // . The value to be sufficiently large. (Currently we simply require a
560 : // non-zero length, so all non-empty values are eligible for storage
561 : // out-of-band in a value block.)
562 : //
563 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
564 : // valueHandle, this should be > 3. But tiny values are common in test and
565 : // unlikely in production, so we use 0 here for better test coverage.
566 2 : const tinyValueThreshold = 0
567 2 : useValueBlock := !w.opts.DisableValueBlocks &&
568 2 : eval.kcmp.PrefixEqual() &&
569 2 : prevKeyKind == InternalKeyKindSet &&
570 2 : keyKind == InternalKeyKindSet &&
571 2 : valueLen > tinyValueThreshold &&
572 2 : w.valueBlock != nil
573 2 : if !useValueBlock {
574 2 : return eval, nil
575 2 : }
576 : // NB: it is possible that eval.kcmp.UserKeyComparison == 0, i.e., these two
577 : // SETs have identical user keys (because of an open snapshot). This should
578 : // be the rare case.
579 :
580 : // If there are bounds requiring some keys' values to be in-place, compare
581 : // the prefix against the bounds.
582 2 : if !w.opts.RequiredInPlaceValueBound.IsEmpty() {
583 1 : if w.comparer.Compare(w.opts.RequiredInPlaceValueBound.Upper, key.UserKey[:eval.kcmp.PrefixLen]) <= 0 {
584 1 : // Common case for CockroachDB. Make it empty since all future keys
585 1 : // will be >= this key.
586 1 : w.opts.RequiredInPlaceValueBound = UserKeyPrefixBound{}
587 1 : } else if w.comparer.Compare(key.UserKey[:eval.kcmp.PrefixLen], w.opts.RequiredInPlaceValueBound.Lower) >= 0 {
588 1 : // Don't write to value block if the key is within the bounds.
589 1 : return eval, nil
590 1 : }
591 : }
592 2 : eval.writeToValueBlock = true
593 2 : return eval, nil
594 : }
595 :
596 : var compressedBlockPool = sync.Pool{
597 2 : New: func() interface{} {
598 2 : return new(compressedBlock)
599 2 : },
600 : }
601 :
602 : type compressedBlock struct {
603 : physical block.PhysicalBlock
604 : blockBuf blockBuf
605 : }
606 :
607 2 : func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) error {
608 2 : serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
609 2 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
610 2 : // Compute the separator that will be written to the index block alongside
611 2 : // this data block's end offset. It is the separator between the last key in
612 2 : // the finished block and the [nextKey] that was excluded from the block.
613 2 : w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
614 2 : if err := w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf); err != nil {
615 0 : return err
616 0 : }
617 2 : w.dataBlock.Reset()
618 2 : w.pendingDataBlockSize = 0
619 2 : return nil
620 : }
621 :
622 : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
623 : // blocks if the number of deletions in the data block exceeds a threshold or
624 : // the deletion size exceeds a threshold. It should be called after the
625 : // data block has been finished.
626 : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
627 2 : func (w *RawColumnWriter) maybeIncrementTombstoneDenseBlocks(uncompressedLen int) {
628 2 : minSize := w.opts.DeletionSizeRatioThreshold * float32(uncompressedLen)
629 2 : if w.dataBlock.numDeletions > w.opts.NumDeletionsThreshold || float32(w.dataBlock.deletionSize) > minSize {
630 2 : w.props.NumTombstoneDenseBlocks++
631 2 : }
632 2 : w.dataBlock.numDeletions = 0
633 2 : w.dataBlock.deletionSize = 0
634 : }
635 :
636 : // enqueueDataBlock compresses and checksums the provided data block and sends
637 : // it to the write queue to be asynchronously written to the underlying storage.
638 : // It also adds the block's index block separator to the pending index block,
639 : // possibly triggering the index block to be finished and buffered.
640 : func (w *RawColumnWriter) enqueueDataBlock(
641 : serializedBlock []byte, lastKey base.InternalKey, separator []byte,
642 2 : ) error {
643 2 : w.lastKeyBuf = append(w.lastKeyBuf[:0], lastKey.UserKey...)
644 2 : w.meta.SetLargestPointKey(base.InternalKey{
645 2 : UserKey: w.lastKeyBuf,
646 2 : Trailer: lastKey.Trailer,
647 2 : })
648 2 :
649 2 : if invariants.Enabled {
650 2 : var dec colblk.DataBlockDecoder
651 2 : dec.Init(w.opts.KeySchema, serializedBlock)
652 2 : if err := dec.Validate(w.comparer, w.opts.KeySchema); err != nil {
653 0 : panic(err)
654 : }
655 : }
656 :
657 : // Serialize the data block, compress it and send it to the write queue.
658 2 : cb := compressedBlockPool.Get().(*compressedBlock)
659 2 : cb.blockBuf.checksummer.Type = w.opts.Checksum
660 2 : cb.physical = block.CompressAndChecksum(
661 2 : &cb.blockBuf.dataBuf,
662 2 : serializedBlock,
663 2 : w.opts.Compression,
664 2 : &cb.blockBuf.checksummer,
665 2 : )
666 2 : if !cb.physical.IsCompressed() {
667 2 : // If the block isn't compressed, cb.physical's underlying data points
668 2 : // directly into a buffer owned by w.dataBlock. Clone it before passing
669 2 : // it to the write queue to be asynchronously written to disk.
670 2 : // TODO(jackson): Should we try to avoid this clone by tracking the
671 2 : // lifetime of the DataBlockWriters?
672 2 : cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
673 2 : }
674 2 : return w.enqueuePhysicalBlock(cb, separator)
675 : }
676 :
677 2 : func (w *RawColumnWriter) enqueuePhysicalBlock(cb *compressedBlock, separator []byte) error {
678 2 : dataBlockHandle := block.Handle{
679 2 : Offset: w.queuedDataSize,
680 2 : Length: uint64(cb.physical.LengthWithoutTrailer()),
681 2 : }
682 2 : w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
683 2 : w.writeQueue.ch <- cb
684 2 :
685 2 : var err error
686 2 : w.blockPropsEncoder.resetProps()
687 2 : for i := range w.blockPropCollectors {
688 2 : scratch := w.blockPropsEncoder.getScratchForProp()
689 2 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
690 0 : return err
691 0 : }
692 2 : w.blockPropsEncoder.addProp(shortID(i), scratch)
693 : }
694 2 : dataBlockProps := w.blockPropsEncoder.unsafeProps()
695 2 :
696 2 : // Add the separator to the index block. This might trigger a flush of the
697 2 : // index block too.
698 2 : i := w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
699 2 : sizeWithEntry := w.indexBlock.Size()
700 2 : if shouldFlushWithoutLatestKV(sizeWithEntry, w.indexBlockSize, i, &w.indexFlush) {
701 2 : // NB: finishIndexBlock will use blockPropsEncoder, so we must clone the
702 2 : // data block's props first.
703 2 : dataBlockProps = slices.Clone(dataBlockProps)
704 2 :
705 2 : if err = w.finishIndexBlock(w.indexBlock.Rows() - 1); err != nil {
706 0 : return err
707 0 : }
708 : // finishIndexBlock reset the index block builder, and we can
709 : // add the block handle to this new index block.
710 2 : _ = w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
711 2 : w.indexBlockSize = w.indexBlock.Size()
712 2 : } else {
713 2 : w.indexBlockSize = sizeWithEntry
714 2 : }
715 : // Incorporate the finished data block's property into the index block, now
716 : // that we've flushed the index block without the new separator if
717 : // necessary.
718 2 : for i := range w.blockPropCollectors {
719 2 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
720 2 : }
721 2 : return nil
722 : }
723 :
724 : // finishIndexBlock finishes the currently pending index block with the first
725 : // [rows] rows. In practice, [rows] is always w.indexBlock.Rows() or
726 : // w.indexBlock.Rows()-1.
727 : //
728 : // The finished index block is buffered until the writer is closed.
729 2 : func (w *RawColumnWriter) finishIndexBlock(rows int) error {
730 2 : defer w.indexBlock.Reset()
731 2 : w.blockPropsEncoder.resetProps()
732 2 : for i := range w.blockPropCollectors {
733 2 : scratch := w.blockPropsEncoder.getScratchForProp()
734 2 : var err error
735 2 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
736 0 : return err
737 0 : }
738 2 : w.blockPropsEncoder.addProp(shortID(i), scratch)
739 : }
740 2 : indexProps := w.blockPropsEncoder.props()
741 2 : bib := bufferedIndexBlock{nEntries: rows, properties: indexProps}
742 2 :
743 2 : // Copy the last (greatest) separator key in the index block into bib.sep.
744 2 : // It'll be the separator on the entry in the top-level index block.
745 2 : //
746 2 : // TODO(jackson): bib.sep.Trailer is unused within the columnar-block
747 2 : // sstable writer. Its existence is a code artifact of reuse of the
748 2 : // bufferedIndexBlock type between colblk and rowblk writers. This can be
749 2 : // cleaned up.
750 2 : bib.sep.Trailer = base.MakeTrailer(base.SeqNumMax, base.InternalKeyKindSeparator)
751 2 : w.indexBuffering.sepAlloc, bib.sep.UserKey = w.indexBuffering.sepAlloc.Copy(
752 2 : w.indexBlock.UnsafeSeparator(rows - 1))
753 2 :
754 2 : // Finish the index block and copy it so that w.indexBlock may be reused.
755 2 : block := w.indexBlock.Finish(rows)
756 2 : if len(w.indexBuffering.blockAlloc) < len(block) {
757 2 : // Allocate enough bytes for approximately 16 index blocks.
758 2 : w.indexBuffering.blockAlloc = make([]byte, len(block)*16)
759 2 : }
760 2 : n := copy(w.indexBuffering.blockAlloc, block)
761 2 : bib.block = w.indexBuffering.blockAlloc[:n:n]
762 2 : w.indexBuffering.blockAlloc = w.indexBuffering.blockAlloc[n:]
763 2 :
764 2 : w.indexBuffering.partitions = append(w.indexBuffering.partitions, bib)
765 2 : return nil
766 : }
767 :
768 : // flushBufferedIndexBlocks writes all index blocks, including the top-level
769 : // index block if necessary, to the underlying writable. It returns the block
770 : // handle of the top index (either the only index block or the top-level index
771 : // if two-level).
772 2 : func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, err error) {
773 2 : // If there's a currently-pending index block, finish it.
774 2 : if w.indexBlock.Rows() > 0 || len(w.indexBuffering.partitions) == 0 {
775 2 : w.finishIndexBlock(w.indexBlock.Rows())
776 2 : }
777 : // We've buffered all the index blocks. Typically there's just one index
778 : // block, in which case we're writing a "single-level" index. If we're
779 : // writing a large file or the index separators happen to be excessively
780 : // long, we may have several index blocks and need to construct a
781 : // "two-level" index structure.
782 2 : switch len(w.indexBuffering.partitions) {
783 0 : case 0:
784 0 : // This is impossible because we'll flush the index block immediately
785 0 : // above this switch statement if there are no buffered partitions
786 0 : // (regardless of whether there are data block handles in the index
787 0 : // block).
788 0 : panic("unreachable")
789 2 : case 1:
790 2 : // Single-level index.
791 2 : rootIndex, err = w.layout.WriteIndexBlock(w.indexBuffering.partitions[0].block)
792 2 : if err != nil {
793 0 : return rootIndex, err
794 0 : }
795 2 : w.props.IndexSize = rootIndex.Length + block.TrailerLen
796 2 : w.props.NumDataBlocks = uint64(w.indexBuffering.partitions[0].nEntries)
797 2 : w.props.IndexType = binarySearchIndex
798 2 : default:
799 2 : // Two-level index.
800 2 : for _, part := range w.indexBuffering.partitions {
801 2 : bh, err := w.layout.WriteIndexBlock(part.block)
802 2 : if err != nil {
803 0 : return block.Handle{}, err
804 0 : }
805 2 : w.props.IndexSize += bh.Length + block.TrailerLen
806 2 : w.props.NumDataBlocks += uint64(w.indexBuffering.partitions[0].nEntries)
807 2 : w.topLevelIndexBlock.AddBlockHandle(part.sep.UserKey, bh, part.properties)
808 : }
809 2 : rootIndex, err = w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish(w.topLevelIndexBlock.Rows()))
810 2 : if err != nil {
811 0 : return block.Handle{}, err
812 0 : }
813 2 : w.props.IndexSize += rootIndex.Length + block.TrailerLen
814 2 : w.props.IndexType = twoLevelIndex
815 2 : w.props.IndexPartitions = uint64(len(w.indexBuffering.partitions))
816 : }
817 2 : return rootIndex, nil
818 : }
819 :
820 : // drainWriteQueue runs in its own goroutine and is responsible for writing
821 : // finished, compressed data blocks to the writable. It reads from w.writeQueue
822 : // until the channel is closed. All data blocks are written by this goroutine.
823 : // Other blocks are written directly by the client goroutine. See Close.
824 2 : func (w *RawColumnWriter) drainWriteQueue() {
825 2 : defer w.writeQueue.wg.Done()
826 2 : for cb := range w.writeQueue.ch {
827 2 : if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
828 0 : w.writeQueue.err = err
829 0 : }
830 2 : cb.blockBuf.clear()
831 2 : cb.physical = block.PhysicalBlock{}
832 2 : compressedBlockPool.Put(cb)
833 : }
834 : }
835 :
836 2 : func (w *RawColumnWriter) Close() (err error) {
837 2 : defer func() {
838 2 : if w.valueBlock != nil {
839 1 : w.valueBlock.Release()
840 1 : // Defensive code in case Close gets called again. We don't want to put
841 1 : // the same object to a sync.Pool.
842 1 : w.valueBlock = nil
843 1 : }
844 2 : w.layout.Abort()
845 2 : // Record any error in the writer (so we can exit early if Close is called
846 2 : // again).
847 2 : if err != nil {
848 1 : w.err = err
849 1 : }
850 : }()
851 2 : if w.layout.writable == nil {
852 1 : return w.err
853 1 : }
854 :
855 : // Finish the last data block and send it to the write queue if it contains
856 : // any pending KVs.
857 2 : if rows := w.dataBlock.Rows(); rows > 0 {
858 2 : serializedBlock, lastKey := w.dataBlock.Finish(rows, w.pendingDataBlockSize)
859 2 : w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], lastKey.UserKey)
860 2 : w.err = errors.CombineErrors(w.err, w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf))
861 2 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
862 2 : }
863 : // Close the write queue channel so that the goroutine responsible for
864 : // writing data blocks to disk knows to exit. Any subsequent blocks (eg,
865 : // index, metadata, range key, etc) will be written by the goroutine that
866 : // called Close.
867 2 : close(w.writeQueue.ch)
868 2 : w.writeQueue.wg.Wait()
869 2 : // If the write queue encountered any errors while writing out data blocks,
870 2 : // it's stored in w.writeQueue.err.
871 2 : w.err = firstError(w.err, w.writeQueue.err)
872 2 : if w.err != nil {
873 1 : return w.err
874 1 : }
875 :
876 : // INVARIANT: w.queuedDataSize == w.layout.offset.
877 : // All data blocks have been written to disk. The queuedDataSize is the
878 : // cumulative size of all the data blocks we've sent to the write queue. Now
879 : // that they've all been flushed, queuedDataSize should match w.layout's
880 : // offset.
881 2 : if w.queuedDataSize != w.layout.offset {
882 0 : panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
883 0 : w.queuedDataSize, w.layout.offset))
884 : }
885 2 : w.props.DataSize = w.layout.offset
886 2 : if _, err = w.flushBufferedIndexBlocks(); err != nil {
887 0 : return err
888 0 : }
889 :
890 : // Write the filter block.
891 2 : if w.filterBlock != nil {
892 2 : bh, err := w.layout.WriteFilterBlock(w.filterBlock)
893 2 : if err != nil {
894 0 : return err
895 0 : }
896 2 : w.props.FilterPolicyName = w.filterBlock.policyName()
897 2 : w.props.FilterSize = bh.Length
898 : }
899 :
900 : // Write the range deletion block if non-empty.
901 2 : if w.rangeDelBlock.KeyCount() > 0 {
902 2 : w.props.NumRangeDeletions = uint64(w.rangeDelBlock.KeyCount())
903 2 : sm, la := w.rangeDelBlock.UnsafeBoundaryKeys()
904 2 : w.meta.SetSmallestRangeDelKey(sm)
905 2 : w.meta.SetLargestRangeDelKey(la)
906 2 : if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
907 0 : return err
908 0 : }
909 : }
910 :
911 : // Write the range key block if non-empty.
912 2 : if w.rangeKeyBlock.KeyCount() > 0 {
913 2 : sm, la := w.rangeKeyBlock.UnsafeBoundaryKeys()
914 2 : w.meta.SetSmallestRangeKey(sm)
915 2 : w.meta.SetLargestRangeKey(la)
916 2 : if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
917 0 : return err
918 0 : }
919 : }
920 :
921 : // Write out the value block.
922 2 : if w.valueBlock != nil {
923 2 : _, vbStats, err := w.valueBlock.Finish(&w.layout, w.layout.offset)
924 2 : if err != nil {
925 0 : return err
926 0 : }
927 2 : w.props.NumValueBlocks = vbStats.NumValueBlocks
928 2 : w.props.NumValuesInValueBlocks = vbStats.NumValuesInValueBlocks
929 2 : w.props.ValueBlocksSize = vbStats.ValueBlocksAndIndexSize
930 : }
931 :
932 : // Write the properties block.
933 2 : {
934 2 : // Finish and record the prop collectors if props are not yet recorded.
935 2 : // Pre-computed props might have been copied by specialized sst creators
936 2 : // like suffix replacer.
937 2 : if len(w.props.UserProperties) == 0 {
938 2 : userProps := make(map[string]string)
939 2 : for i := range w.blockPropCollectors {
940 2 : scratch := w.blockPropsEncoder.getScratchForProp()
941 2 : // Place the shortID in the first byte.
942 2 : scratch = append(scratch, byte(i))
943 2 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
944 2 : if err != nil {
945 0 : return err
946 0 : }
947 2 : var prop string
948 2 : if len(buf) > 0 {
949 2 : prop = string(buf)
950 2 : }
951 : // NB: The property is populated in the map even if it is the
952 : // empty string, since the presence in the map is what indicates
953 : // that the block property collector was used when writing.
954 2 : userProps[w.blockPropCollectors[i].Name()] = prop
955 : }
956 2 : if len(userProps) > 0 {
957 2 : w.props.UserProperties = userProps
958 2 : }
959 : }
960 :
961 2 : var raw rowblk.Writer
962 2 : // The restart interval is set to infinity because the properties block
963 2 : // is always read sequentially and cached in a heap located object. This
964 2 : // reduces table size without a significant impact on performance.
965 2 : raw.RestartInterval = propertiesBlockRestartInterval
966 2 : w.props.CompressionOptions = rocksDBCompressionOptions
967 2 : w.props.save(w.opts.TableFormat, &raw)
968 2 : if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
969 0 : return err
970 0 : }
971 : }
972 :
973 : // Write the table footer.
974 2 : w.meta.Size, err = w.layout.Finish()
975 2 : if err != nil {
976 0 : return err
977 0 : }
978 2 : w.meta.Properties = w.props
979 2 : // Release any held memory and make any future calls error.
980 2 : *w = RawColumnWriter{meta: w.meta, err: errWriterClosed}
981 2 : return nil
982 : }
983 :
984 : // rewriteSuffixes implements RawWriter.
985 : func (w *RawColumnWriter) rewriteSuffixes(
986 : r *Reader, wo WriterOptions, from, to []byte, concurrency int,
987 1 : ) error {
988 1 : for _, c := range w.blockPropCollectors {
989 1 : if !c.SupportsSuffixReplacement() {
990 0 : return errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
991 0 : }
992 : }
993 1 : l, err := r.Layout()
994 1 : if err != nil {
995 0 : return errors.Wrap(err, "reading layout")
996 0 : }
997 : // Copy data blocks in parallel, rewriting suffixes as we go.
998 1 : blocks, err := rewriteDataBlocksInParallel(r, wo, l.Data, from, to, concurrency, func() blockRewriter {
999 1 : return colblk.NewDataBlockRewriter(wo.KeySchema, w.comparer)
1000 1 : })
1001 1 : if err != nil {
1002 1 : return errors.Wrap(err, "rewriting data blocks")
1003 1 : }
1004 :
1005 : // oldShortIDs maps the shortID for the block property collector in the old
1006 : // blocks to the shortID in the new blocks. Initialized once for the sstable.
1007 1 : oldShortIDs, n, err := getShortIDs(r, w.blockPropCollectors)
1008 1 : if err != nil {
1009 0 : return errors.Wrap(err, "getting short IDs")
1010 0 : }
1011 1 : oldProps := make([][]byte, len(w.blockPropCollectors))
1012 1 : for i := range blocks {
1013 1 : cb := compressedBlockPool.Get().(*compressedBlock)
1014 1 : cb.physical = blocks[i].physical
1015 1 :
1016 1 : // Load any previous values for our prop collectors into oldProps.
1017 1 : for i := range oldProps {
1018 1 : oldProps[i] = nil
1019 1 : }
1020 1 : decoder := makeBlockPropertiesDecoder(n, l.Data[i].Props)
1021 1 : for !decoder.Done() {
1022 1 : id, val, err := decoder.Next()
1023 1 : if err != nil {
1024 0 : return err
1025 0 : }
1026 1 : if oldShortIDs[id].IsValid() {
1027 1 : oldProps[oldShortIDs[id]] = val
1028 1 : }
1029 : }
1030 1 : for i, p := range w.blockPropCollectors {
1031 1 : if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil {
1032 0 : return err
1033 0 : }
1034 : }
1035 1 : var separator []byte
1036 1 : if i+1 < len(blocks) {
1037 1 : w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], blocks[i].end.UserKey, blocks[i+1].start.UserKey)
1038 1 : separator = w.separatorBuf
1039 1 : } else {
1040 1 : w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], blocks[i].end.UserKey)
1041 1 : separator = w.separatorBuf
1042 1 : }
1043 1 : w.enqueuePhysicalBlock(cb, separator)
1044 : }
1045 :
1046 1 : if len(blocks) > 0 {
1047 1 : w.meta.updateSeqNum(blocks[0].start.SeqNum())
1048 1 : w.props.NumEntries = r.Properties.NumEntries
1049 1 : w.props.RawKeySize = r.Properties.RawKeySize
1050 1 : w.props.RawValueSize = r.Properties.RawValueSize
1051 1 : w.meta.SetSmallestPointKey(blocks[0].start)
1052 1 : w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
1053 1 : }
1054 :
1055 : // Copy range key block, replacing suffixes if it exists.
1056 1 : if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
1057 0 : return errors.Wrap(err, "rewriting range key blocks")
1058 0 : }
1059 : // Copy over the filter block if it exists.
1060 1 : if w.filterBlock != nil {
1061 1 : if filterBlockBH, ok := l.FilterByName(w.filterBlock.metaName()); ok {
1062 1 : filterBlock, _, err := readBlockBuf(r, filterBlockBH, nil)
1063 1 : if err != nil {
1064 0 : return errors.Wrap(err, "reading filter")
1065 0 : }
1066 1 : w.filterBlock = copyFilterWriter{
1067 1 : origPolicyName: w.filterBlock.policyName(),
1068 1 : origMetaName: w.filterBlock.metaName(),
1069 1 : data: filterBlock,
1070 1 : }
1071 : }
1072 : }
1073 1 : return nil
1074 : }
1075 :
1076 : func shouldFlushWithoutLatestKV(
1077 : sizeWithKV int, sizeWithoutKV int, entryCountWithoutKV int, flushGovernor *block.FlushGovernor,
1078 2 : ) bool {
1079 2 : if entryCountWithoutKV == 0 {
1080 2 : return false
1081 2 : }
1082 2 : if sizeWithoutKV < flushGovernor.LowWatermark() {
1083 2 : // Fast path when the block is too small to flush.
1084 2 : return false
1085 2 : }
1086 2 : return flushGovernor.ShouldFlush(sizeWithoutKV, sizeWithKV)
1087 : }
1088 :
1089 : // copyDataBlocks adds a range of blocks to the table as-is. These blocks could be
1090 : // compressed. It's specifically used by the sstable copier that can copy parts
1091 : // of an sstable to a new sstable, using CopySpan().
1092 : func (w *RawColumnWriter) copyDataBlocks(
1093 : ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
1094 2 : ) error {
1095 2 : const readSizeTarget = 256 << 10
1096 2 : readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
1097 2 : if firstBlockIdx > lastBlockIdx {
1098 0 : panic("pebble: readAndFlushBlocks called with invalid block range")
1099 : }
1100 : // We need to flush blocks[firstBlockIdx:lastBlockIdx+1] into the write queue.
1101 : // We do this by issuing one big read from the read handle into the buffer, and
1102 : // then enqueueing the writing of those blocks one-by-one.
1103 : //
1104 : // TODO(bilal): Consider refactoring the write queue to support writing multiple
1105 : // blocks in one request.
1106 2 : lastBH := blocks[lastBlockIdx].bh
1107 2 : blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
1108 2 : // We need to create a new buffer for each read, as w.enqueuePhysicalBlock passes
1109 2 : // a pointer to the buffer to the write queue.
1110 2 : buf := make([]byte, 0, blocksToReadLen)
1111 2 : if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
1112 0 : return err
1113 0 : }
1114 2 : for i := firstBlockIdx; i <= lastBlockIdx; i++ {
1115 2 : offsetDiff := blocks[i].bh.Offset - blocks[firstBlockIdx].bh.Offset
1116 2 : blockBuf := buf[offsetDiff : offsetDiff+blocks[i].bh.Length+block.TrailerLen]
1117 2 : cb := compressedBlockPool.Get().(*compressedBlock)
1118 2 : cb.physical = block.NewPhysicalBlock(blockBuf)
1119 2 : if err := w.enqueuePhysicalBlock(cb, blocks[i].sep); err != nil {
1120 0 : return err
1121 0 : }
1122 : }
1123 2 : return nil
1124 : }
1125 : // Iterate through blocks until we have enough to fill readSizeTarget. When we have more than
1126 : // one block in blocksToRead and adding the next block would exceed the target buffer capacity,
1127 : // we read and flush existing blocks in blocksToRead. This allows us to read as many
1128 : // blocks in one IO request as possible, while still utilizing the write queue in this
1129 : // writer.
1130 2 : lastBlockOffset := uint64(0)
1131 2 : for i := 0; i < len(blocks); {
1132 2 : if blocks[i].bh.Offset < lastBlockOffset {
1133 0 : panic("pebble: copyDataBlocks called with blocks out of order")
1134 : }
1135 2 : start := i
1136 2 : // Note the i++ in the initializing condition; this means we will always flush at least
1137 2 : // one block.
1138 2 : for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(readSizeTarget); i++ {
1139 1 : }
1140 : // i points to one index past the last block we want to read.
1141 2 : if err := readAndFlushBlocks(start, i-1); err != nil {
1142 0 : return err
1143 0 : }
1144 : }
1145 2 : return nil
1146 : }
1147 :
1148 : // addDataBlock adds a raw uncompressed data block to the table as-is. It's specifically used
1149 : // by the sstable copier that can copy parts of an sstable to a new sstable,
1150 : // using CopySpan().
1151 2 : func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
1152 2 : // Serialize the data block, compress it and send it to the write queue.
1153 2 : cb := compressedBlockPool.Get().(*compressedBlock)
1154 2 : cb.blockBuf.checksummer.Type = w.opts.Checksum
1155 2 : cb.physical = block.CompressAndChecksum(
1156 2 : &cb.blockBuf.dataBuf,
1157 2 : b,
1158 2 : w.opts.Compression,
1159 2 : &cb.blockBuf.checksummer,
1160 2 : )
1161 2 : if !cb.physical.IsCompressed() {
1162 2 : // If the block isn't compressed, cb.physical's underlying data points
1163 2 : // directly into a buffer owned by w.dataBlock. Clone it before passing
1164 2 : // it to the write queue to be asynchronously written to disk.
1165 2 : // TODO(jackson): Should we try to avoid this clone by tracking the
1166 2 : // lifetime of the DataBlockWriters?
1167 2 : cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
1168 2 : }
1169 2 : if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
1170 0 : return err
1171 0 : }
1172 2 : return nil
1173 : }
1174 :
1175 : // copyFilter copies the specified filter to the table. It's specifically used
1176 : // by the sstable copier that can copy parts of an sstable to a new sstable,
1177 : // using CopySpan().
1178 1 : func (w *RawColumnWriter) copyFilter(filter []byte, filterName string) error {
1179 1 : if w.filterBlock != nil && filterName != w.filterBlock.policyName() {
1180 0 : return errors.New("mismatched filters")
1181 0 : }
1182 1 : w.filterBlock = copyFilterWriter{
1183 1 : origPolicyName: w.filterBlock.policyName(), origMetaName: w.filterBlock.metaName(), data: filter,
1184 1 : }
1185 1 : return nil
1186 : }
1187 :
1188 : // copyProperties copies properties from the specified props, and resets others
1189 : // to prepare for copying data blocks from another sstable, using the copy/addDataBlock(s)
1190 : // methods above. It's specifically used by the sstable copier that can copy parts of an
1191 : // sstable to a new sstable, using CopySpan().
1192 2 : func (w *RawColumnWriter) copyProperties(props Properties) {
1193 2 : w.props = props
1194 2 : // Remove all user properties to disable block properties, which we do not
1195 2 : // calculate for CopySpan.
1196 2 : w.props.UserProperties = nil
1197 2 : // Reset props that we'll re-derive as we build our own index.
1198 2 : w.props.IndexPartitions = 0
1199 2 : w.props.TopLevelIndexSize = 0
1200 2 : w.props.IndexSize = 0
1201 2 : w.props.IndexType = 0
1202 2 : }
|