Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "math"
13 : "slices"
14 : "sync"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/bytealloc"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/internal/keyspan"
21 : "github.com/cockroachdb/pebble/objstorage"
22 : "github.com/cockroachdb/pebble/sstable/block"
23 : "github.com/cockroachdb/pebble/sstable/colblk"
24 : "github.com/cockroachdb/pebble/sstable/rowblk"
25 : "github.com/cockroachdb/pebble/sstable/valblk"
26 : )
27 :
28 : // RawColumnWriter is a sstable RawWriter that writes sstables with
29 : // column-oriented blocks. All table formats TableFormatPebblev5 and later write
30 : // column-oriented blocks and use RawColumnWriter.
31 : type RawColumnWriter struct {
32 : comparer *base.Comparer
33 : meta WriterMetadata
34 : opts WriterOptions
35 : err error
36 :
37 : dataFlush block.FlushGovernor
38 : indexFlush block.FlushGovernor
39 : blockPropCollectors []BlockPropertyCollector
40 : blockPropsEncoder blockPropertiesEncoder
41 : obsoleteCollector obsoleteKeyBlockPropertyCollector
42 : props Properties
43 : // block writers buffering unflushed data.
44 : dataBlock struct {
45 : colblk.DataBlockEncoder
46 : // numDeletions stores the count of point tombstones in this data block.
47 : // It's used to determine if this data block is considered
48 : // tombstone-dense for the purposes of compaction.
49 : numDeletions int
50 : // deletionSize stores the raw size of point tombstones in this data
51 : // block. It's used to determine if this data block is considered
52 : // tombstone-dense for the purposes of compaction.
53 : deletionSize int
54 : }
55 : indexBlock colblk.IndexBlockWriter
56 : topLevelIndexBlock colblk.IndexBlockWriter
57 : rangeDelBlock colblk.KeyspanBlockWriter
58 : rangeKeyBlock colblk.KeyspanBlockWriter
59 : valueBlock *valblk.Writer // nil iff WriterOptions.DisableValueBlocks=true
60 : // filter accumulates the filter block. If populated, the filter ingests
61 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
62 : // nil, or the full keys otherwise.
63 : filterBlock filterWriter
64 : prevPointKey struct {
65 : trailer base.InternalKeyTrailer
66 : isObsolete bool
67 : }
68 : pendingDataBlockSize int
69 : indexBlockSize int
70 : queuedDataSize uint64
71 :
72 : // indexBuffering holds finished index blocks as they're completed while
73 : // building the sstable. If an index block grows sufficiently large
74 : // (IndexBlockSize) while an sstable is still being constructed, the sstable
75 : // writer will create a two-level index structure. As index blocks are
76 : // completed, they're finished and buffered in-memory until the table is
77 : // finished. When the table is finished, the buffered index blocks are
78 : // flushed in order after all the data blocks, and the top-level index block
79 : // is constructed to point to all the individual index blocks.
80 : indexBuffering struct {
81 : // partitions holds all the completed index blocks.
82 : partitions []bufferedIndexBlock
83 : // blockAlloc is used to bulk-allocate byte slices used to store index
84 : // blocks in partitions. These live until the sstable is finished.
85 : blockAlloc []byte
86 : // sepAlloc is used to bulk-allocate index block separator slices stored
87 : // in partitions. These live until the sstable is finished.
88 : sepAlloc bytealloc.A
89 : }
90 :
91 : writeQueue struct {
92 : wg sync.WaitGroup
93 : ch chan *compressedBlock
94 : err error
95 : }
96 : layout layoutWriter
97 :
98 : lastKeyBuf []byte
99 : separatorBuf []byte
100 : tmp [blockHandleLikelyMaxLen]byte
101 : previousUserKey invariants.Value[[]byte]
102 : disableKeyOrderChecks bool
103 : }
104 :
105 : // Assert that *RawColumnWriter implements RawWriter.
106 : var _ RawWriter = (*RawColumnWriter)(nil)
107 :
108 1 : func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
109 1 : if writable == nil {
110 0 : panic("pebble: nil writable")
111 : }
112 1 : if !o.TableFormat.BlockColumnar() {
113 0 : panic(errors.AssertionFailedf("newColumnarWriter cannot create sstables with %s format", o.TableFormat))
114 : }
115 1 : o = o.ensureDefaults()
116 1 : w := &RawColumnWriter{
117 1 : comparer: o.Comparer,
118 1 : meta: WriterMetadata{
119 1 : SmallestSeqNum: math.MaxUint64,
120 1 : },
121 1 : opts: o,
122 1 : layout: makeLayoutWriter(writable, o),
123 1 : disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
124 1 : }
125 1 : w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
126 1 : w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
127 1 : w.dataBlock.Init(o.KeySchema)
128 1 : w.indexBlock.Init()
129 1 : w.topLevelIndexBlock.Init()
130 1 : w.rangeDelBlock.Init(w.comparer.Equal)
131 1 : w.rangeKeyBlock.Init(w.comparer.Equal)
132 1 : if !o.DisableValueBlocks {
133 1 : w.valueBlock = valblk.NewWriter(
134 1 : block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
135 1 : w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
136 : }
137 1 : if o.FilterPolicy != nil {
138 1 : switch o.FilterType {
139 1 : case TableFilter:
140 1 : w.filterBlock = newTableFilterWriter(o.FilterPolicy)
141 0 : default:
142 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
143 : }
144 : }
145 :
146 1 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
147 1 : if !o.disableObsoleteCollector {
148 1 : numBlockPropertyCollectors++
149 1 : }
150 1 : if numBlockPropertyCollectors > maxPropertyCollectors {
151 0 : panic(errors.New("pebble: too many block property collectors"))
152 : }
153 1 : w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
154 1 : for _, constructFn := range o.BlockPropertyCollectors {
155 1 : w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
156 1 : }
157 1 : if !o.disableObsoleteCollector {
158 1 : w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
159 1 : }
160 1 : var buf bytes.Buffer
161 1 : buf.WriteString("[")
162 1 : for i := range w.blockPropCollectors {
163 1 : if i > 0 {
164 1 : buf.WriteString(",")
165 1 : }
166 1 : buf.WriteString(w.blockPropCollectors[i].Name())
167 : }
168 1 : buf.WriteString("]")
169 1 : w.props.PropertyCollectorNames = buf.String()
170 1 :
171 1 : w.props.ComparerName = o.Comparer.Name
172 1 : w.props.CompressionName = o.Compression.String()
173 1 : w.props.KeySchemaName = o.KeySchema.Name
174 1 : w.props.MergerName = o.MergerName
175 1 :
176 1 : w.writeQueue.ch = make(chan *compressedBlock)
177 1 : w.writeQueue.wg.Add(1)
178 1 : go w.drainWriteQueue()
179 1 : return w
180 : }
181 :
182 : // Error returns the current accumulated error if any.
183 1 : func (w *RawColumnWriter) Error() error {
184 1 : return w.err
185 1 : }
186 :
187 : // EstimatedSize returns the estimated size of the sstable being written if
188 : // a call to Close() was made without adding additional keys.
189 1 : func (w *RawColumnWriter) EstimatedSize() uint64 {
190 1 : sz := rocksDBFooterLen + w.queuedDataSize
191 1 : // TODO(jackson): Avoid iterating over partitions by incrementally
192 1 : // maintaining the size contribution of all buffered partitions.
193 1 : for _, bib := range w.indexBuffering.partitions {
194 0 : // We include the separator user key to account for its bytes in the
195 0 : // top-level index block.
196 0 : //
197 0 : // TODO(jackson): We could incrementally build the top-level index block
198 0 : // and produce an exact calculation of the current top-level index
199 0 : // block's size.
200 0 : sz += uint64(len(bib.block) + block.TrailerLen + len(bib.sep.UserKey))
201 0 : }
202 1 : if w.rangeDelBlock.KeyCount() > 0 {
203 1 : sz += uint64(w.rangeDelBlock.Size())
204 1 : }
205 1 : if w.rangeKeyBlock.KeyCount() > 0 {
206 1 : sz += uint64(w.rangeKeyBlock.Size())
207 1 : }
208 1 : if w.valueBlock != nil {
209 1 : sz += w.valueBlock.Size()
210 1 : }
211 : // TODO(jackson): Include an estimate of the properties, filter and meta
212 : // index blocks sizes.
213 1 : return sz
214 : }
215 :
216 : // ComparePrev compares the provided user to the last point key written to the
217 : // writer. The returned value is equivalent to Compare(key, prevKey) where
218 : // prevKey is the last point key written to the writer.
219 : //
220 : // If no key has been written yet, ComparePrev returns +1.
221 : //
222 : // Must not be called after Writer is closed.
223 1 : func (w *RawColumnWriter) ComparePrev(k []byte) int {
224 1 : if w == nil || w.dataBlock.Rows() == 0 {
225 1 : return +1
226 1 : }
227 1 : return int(w.dataBlock.KeyWriter.ComparePrev(k).UserKeyComparison)
228 : }
229 :
230 : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
231 : // be used internally by Pebble.
232 : func (w *RawColumnWriter) SetSnapshotPinnedProperties(
233 : pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
234 1 : ) {
235 1 : w.props.SnapshotPinnedKeys = pinnedKeyCount
236 1 : w.props.SnapshotPinnedKeySize = pinnedKeySize
237 1 : w.props.SnapshotPinnedValueSize = pinnedValueSize
238 1 : }
239 :
240 : // Metadata returns the metadata for the finished sstable. Only valid to call
241 : // after the sstable has been finished.
242 1 : func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
243 1 : if !w.layout.IsFinished() {
244 0 : return nil, errors.New("pebble: writer is not closed")
245 0 : }
246 1 : return &w.meta, nil
247 : }
248 :
249 : // EncodeSpan encodes the keys in the given span. The span can contain either
250 : // only RANGEDEL keys or only range keys.
251 1 : func (w *RawColumnWriter) EncodeSpan(span keyspan.Span) error {
252 1 : if span.Empty() {
253 1 : return nil
254 1 : }
255 1 : for _, k := range span.Keys {
256 1 : w.meta.updateSeqNum(k.SeqNum())
257 1 : }
258 :
259 1 : blockWriter := &w.rangeKeyBlock
260 1 : if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
261 1 : blockWriter = &w.rangeDelBlock
262 1 : // Update range delete properties.
263 1 : // NB: These properties are computed differently than the rowblk sstable
264 1 : // writer because this writer does not flatten them into row key-value
265 1 : // pairs.
266 1 : w.props.RawKeySize += uint64(len(span.Start) + len(span.End))
267 1 : count := uint64(len(span.Keys))
268 1 : w.props.NumEntries += count
269 1 : w.props.NumDeletions += count
270 1 : w.props.NumRangeDeletions += count
271 1 : } else {
272 1 : // Update range key properties.
273 1 : // NB: These properties are computed differently than the rowblk sstable
274 1 : // writer because this writer does not flatten them into row key-value
275 1 : // pairs.
276 1 : w.props.RawRangeKeyKeySize += uint64(len(span.Start) + len(span.End))
277 1 : for _, k := range span.Keys {
278 1 : w.props.RawRangeKeyValueSize += uint64(len(k.Value))
279 1 : switch k.Kind() {
280 1 : case base.InternalKeyKindRangeKeyDelete:
281 1 : w.props.NumRangeKeyDels++
282 1 : case base.InternalKeyKindRangeKeySet:
283 1 : w.props.NumRangeKeySets++
284 1 : case base.InternalKeyKindRangeKeyUnset:
285 1 : w.props.NumRangeKeyUnsets++
286 0 : default:
287 0 : panic(errors.Errorf("pebble: invalid range key type: %s", k.Kind()))
288 : }
289 : }
290 1 : for i := range w.blockPropCollectors {
291 1 : if err := w.blockPropCollectors[i].AddRangeKeys(span); err != nil {
292 0 : return err
293 0 : }
294 : }
295 : }
296 1 : if !w.disableKeyOrderChecks && blockWriter.KeyCount() > 0 {
297 1 : // Check that spans are being added in fragmented order. If the two
298 1 : // tombstones overlap, their start and end keys must be identical.
299 1 : prevStart, prevEnd, prevTrailer := blockWriter.UnsafeLastSpan()
300 1 : if w.opts.Comparer.Equal(prevStart, span.Start) && w.opts.Comparer.Equal(prevEnd, span.End) {
301 1 : if prevTrailer < span.Keys[0].Trailer {
302 1 : w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
303 1 : w.opts.Comparer.FormatKey(prevStart),
304 1 : w.opts.Comparer.FormatKey(prevEnd),
305 1 : prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
306 1 : }
307 1 : } else if c := w.opts.Comparer.Compare(prevEnd, span.Start); c > 0 {
308 1 : w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
309 1 : w.opts.Comparer.FormatKey(prevStart),
310 1 : w.opts.Comparer.FormatKey(prevEnd),
311 1 : prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
312 1 : return w.err
313 1 : }
314 : }
315 1 : blockWriter.AddSpan(span)
316 1 : return nil
317 : }
318 :
319 : // AddWithForceObsolete adds a point key/value pair when writing a
320 : // strict-obsolete sstable. For a given Writer, the keys passed to Add must be
321 : // in increasing order. Span keys (range deletions, range keys) must be added
322 : // through EncodeSpan.
323 : //
324 : // forceObsolete indicates whether the caller has determined that this key is
325 : // obsolete even though it may be the latest point key for this userkey. This
326 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
327 : // strict-obsolete sstables.
328 : //
329 : // Note that there are two properties, S1 and S2 (see comment in format.go)
330 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
331 : // responsibility of the caller. S1 is solely the responsibility of the
332 : // callee.
333 : func (w *RawColumnWriter) AddWithForceObsolete(
334 : key InternalKey, value []byte, forceObsolete bool,
335 1 : ) error {
336 1 : switch key.Kind() {
337 : case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
338 1 : base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
339 1 : return errors.Newf("%s must be added through EncodeSpan", key.Kind())
340 1 : case base.InternalKeyKindMerge:
341 1 : if w.opts.IsStrictObsolete {
342 0 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
343 0 : }
344 : }
345 :
346 1 : eval, err := w.evaluatePoint(key, len(value))
347 1 : if err != nil {
348 1 : return err
349 1 : }
350 1 : eval.isObsolete = eval.isObsolete || forceObsolete
351 1 : w.prevPointKey.trailer = key.Trailer
352 1 : w.prevPointKey.isObsolete = eval.isObsolete
353 1 :
354 1 : var valuePrefix block.ValuePrefix
355 1 : var valueStoredWithKey []byte
356 1 : if eval.writeToValueBlock {
357 1 : vh, err := w.valueBlock.AddValue(value)
358 1 : if err != nil {
359 0 : return err
360 0 : }
361 1 : n := valblk.EncodeHandle(w.tmp[:], vh)
362 1 : valueStoredWithKey = w.tmp[:n]
363 1 : var attribute base.ShortAttribute
364 1 : if w.opts.ShortAttributeExtractor != nil {
365 1 : // TODO(sumeer): for compactions, it is possible that the input sstable
366 1 : // already has this value in the value section and so we have already
367 1 : // extracted the ShortAttribute. Avoid extracting it again. This will
368 1 : // require changing the RawWriter.Add interface.
369 1 : if attribute, err = w.opts.ShortAttributeExtractor(
370 1 : key.UserKey, int(eval.kcmp.PrefixLen), value); err != nil {
371 0 : return err
372 0 : }
373 : }
374 1 : valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
375 1 : } else {
376 1 : valueStoredWithKey = value
377 1 : if len(value) > 0 {
378 1 : valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
379 1 : }
380 : }
381 :
382 : // Append the key to the data block. We have NOT yet committed to
383 : // including the key in the block. The data block writer permits us to
384 : // finish the block excluding the last-appended KV.
385 1 : entriesWithoutKV := w.dataBlock.Rows()
386 1 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
387 1 :
388 1 : // Now that we've appended the KV pair, we can compute the exact size of the
389 1 : // block with this key-value pair included. Check to see if we should flush
390 1 : // the current block, either with or without the added key-value pair.
391 1 : size := w.dataBlock.Size()
392 1 : if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize, entriesWithoutKV, &w.dataFlush) {
393 1 : // Flush the data block excluding the key we just added.
394 1 : if err := w.flushDataBlockWithoutNextKey(key.UserKey); err != nil {
395 0 : w.err = err
396 0 : return err
397 0 : }
398 : // flushDataBlockWithoutNextKey reset the data block builder, and we can
399 : // add the key to this next block now.
400 1 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
401 1 : w.pendingDataBlockSize = w.dataBlock.Size()
402 1 : } else {
403 1 : // We're not flushing the data block, and we're committing to including
404 1 : // the current KV in the block. Remember the new size of the data block
405 1 : // with the current KV.
406 1 : w.pendingDataBlockSize = size
407 1 : }
408 :
409 1 : for i := range w.blockPropCollectors {
410 1 : v := value
411 1 : if key.Kind() == base.InternalKeyKindSet {
412 1 : // Values for SET are not required to be in-place, and in the future
413 1 : // may not even be read by the compaction, so pass nil values. Block
414 1 : // property collectors in such Pebble DB's must not look at the
415 1 : // value.
416 1 : v = nil
417 1 : }
418 1 : if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
419 0 : w.err = err
420 0 : return err
421 0 : }
422 : }
423 1 : w.obsoleteCollector.AddPoint(eval.isObsolete)
424 1 : if w.filterBlock != nil {
425 1 : w.filterBlock.addKey(key.UserKey[:eval.kcmp.PrefixLen])
426 1 : }
427 1 : w.meta.updateSeqNum(key.SeqNum())
428 1 : if !w.meta.HasPointKeys {
429 1 : w.meta.SetSmallestPointKey(key.Clone())
430 1 : }
431 :
432 1 : w.props.NumEntries++
433 1 : switch key.Kind() {
434 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
435 1 : w.props.NumDeletions++
436 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
437 1 : w.dataBlock.numDeletions++
438 1 : w.dataBlock.deletionSize += len(key.UserKey)
439 1 : case InternalKeyKindDeleteSized:
440 1 : var size uint64
441 1 : if len(value) > 0 {
442 1 : var n int
443 1 : size, n = binary.Uvarint(value)
444 1 : if n <= 0 {
445 0 : return errors.Newf("%s key's value (%x) does not parse as uvarint",
446 0 : errors.Safe(key.Kind().String()), value)
447 0 : }
448 : }
449 1 : w.props.NumDeletions++
450 1 : w.props.NumSizedDeletions++
451 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
452 1 : w.props.RawPointTombstoneValueSize += size
453 1 : w.dataBlock.numDeletions++
454 1 : w.dataBlock.deletionSize += len(key.UserKey)
455 1 : case InternalKeyKindMerge:
456 1 : w.props.NumMergeOperands++
457 : }
458 1 : w.props.RawKeySize += uint64(key.Size())
459 1 : w.props.RawValueSize += uint64(len(value))
460 1 : return nil
461 : }
462 :
463 : type pointKeyEvaluation struct {
464 : kcmp colblk.KeyComparison
465 : isObsolete bool
466 : writeToValueBlock bool
467 : }
468 :
469 : // evaluatePoint takes information about a point key being written to the
470 : // sstable and decides how the point should be represented, where its value
471 : // should be stored, etc.
472 : func (w *RawColumnWriter) evaluatePoint(
473 : key base.InternalKey, valueLen int,
474 1 : ) (eval pointKeyEvaluation, err error) {
475 1 : eval.kcmp = w.dataBlock.KeyWriter.ComparePrev(key.UserKey)
476 1 :
477 1 : // When invariants are enabled, validate kcmp.
478 1 : if invariants.Enabled {
479 1 : colblk.AssertKeyCompare(w.comparer, key.UserKey, w.previousUserKey.Get(), eval.kcmp)
480 1 : w.previousUserKey.Set(append(w.previousUserKey.Get()[:0], key.UserKey...))
481 1 : }
482 :
483 1 : if !w.meta.HasPointKeys {
484 1 : return eval, nil
485 1 : }
486 1 : keyKind := key.Kind()
487 1 : // Ensure that no one adds a point key kind without considering the obsolete
488 1 : // handling for that kind.
489 1 : switch keyKind {
490 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
491 1 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
492 0 : default:
493 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
494 : }
495 1 : prevKeyKind := w.prevPointKey.trailer.Kind()
496 1 : // If same user key, then the current key is obsolete if any of the
497 1 : // following is true:
498 1 : // C1 The prev key was obsolete.
499 1 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
500 1 : // preserve SET* and MERGE since their values will be merged into the
501 1 : // previous key. We also must preserve DEL* since there may be an older
502 1 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
503 1 : // if we omit the DEL* that lower SET*/MERGE will become visible.
504 1 : //
505 1 : // Regardless of whether it is the same user key or not
506 1 : // C3 The current key is some kind of point delete, and we are writing to
507 1 : // the lowest level, then it is also obsolete. The correctness of this
508 1 : // relies on the same user key not spanning multiple sstables in a level.
509 1 : //
510 1 : // C1 ensures that for a user key there is at most one transition from
511 1 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
512 1 : // are not obsolete. We consider the various value of n:
513 1 : //
514 1 : // n = 0: This happens due to forceObsolete being set by the caller, or due
515 1 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
516 1 : // must also delete all the lower seqnums for the same user key. C3 triggers
517 1 : // due to a point delete and that deletes all the lower seqnums for the same
518 1 : // user key.
519 1 : //
520 1 : // n = 1: This is the common case. It happens when the first key is not a
521 1 : // MERGE, or the current key is some kind of point delete.
522 1 : //
523 1 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
524 1 : // single non-MERGE key.
525 1 : isObsoleteC1AndC2 := eval.kcmp.UserKeyComparison == 0 &&
526 1 : (w.prevPointKey.isObsolete || prevKeyKind != InternalKeyKindMerge)
527 1 : isObsoleteC3 := w.opts.WritingToLowestLevel &&
528 1 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
529 1 : keyKind == InternalKeyKindDeleteSized)
530 1 : eval.isObsolete = isObsoleteC1AndC2 || isObsoleteC3
531 1 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
532 1 : // possible, but requires some care in documenting and checking invariants.
533 1 : // There is code that assumes nothing in value blocks because of single MVCC
534 1 : // version (those should be ok). We have to ensure setHasSamePrefix is
535 1 : // correctly initialized here etc.
536 1 :
537 1 : if !w.disableKeyOrderChecks && (eval.kcmp.UserKeyComparison < 0 ||
538 1 : (eval.kcmp.UserKeyComparison == 0 && w.prevPointKey.trailer <= key.Trailer)) {
539 1 : previousKey := base.InternalKey{
540 1 : UserKey: w.dataBlock.MaterializeLastUserKey(nil),
541 1 : Trailer: w.prevPointKey.trailer,
542 1 : }
543 1 : return eval, errors.Errorf(
544 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
545 1 : previousKey.Pretty(w.comparer.FormatKey),
546 1 : key.Pretty(w.comparer.FormatKey))
547 1 : }
548 :
549 : // We might want to write this key's value to a value block if it has the
550 : // same prefix.
551 : //
552 : // We require:
553 : // . Value blocks to be enabled.
554 : // . The current key to have the same prefix as the previous key.
555 : // . The previous key to be a SET.
556 : // . The current key to be a SET.
557 : // . If there are bounds requiring some keys' values to be in-place, the
558 : // key must not fall within those bounds.
559 : // . The value to be sufficiently large. (Currently we simply require a
560 : // non-zero length, so all non-empty values are eligible for storage
561 : // out-of-band in a value block.)
562 : //
563 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
564 : // valueHandle, this should be > 3. But tiny values are common in test and
565 : // unlikely in production, so we use 0 here for better test coverage.
566 1 : const tinyValueThreshold = 0
567 1 : useValueBlock := !w.opts.DisableValueBlocks &&
568 1 : eval.kcmp.PrefixEqual() &&
569 1 : prevKeyKind == InternalKeyKindSet &&
570 1 : keyKind == InternalKeyKindSet &&
571 1 : valueLen > tinyValueThreshold &&
572 1 : w.valueBlock != nil
573 1 : if !useValueBlock {
574 1 : return eval, nil
575 1 : }
576 : // NB: it is possible that eval.kcmp.UserKeyComparison == 0, i.e., these two
577 : // SETs have identical user keys (because of an open snapshot). This should
578 : // be the rare case.
579 :
580 : // If there are bounds requiring some keys' values to be in-place, compare
581 : // the prefix against the bounds.
582 1 : if !w.opts.RequiredInPlaceValueBound.IsEmpty() {
583 1 : if w.comparer.Compare(w.opts.RequiredInPlaceValueBound.Upper, key.UserKey[:eval.kcmp.PrefixLen]) <= 0 {
584 1 : // Common case for CockroachDB. Make it empty since all future keys
585 1 : // will be >= this key.
586 1 : w.opts.RequiredInPlaceValueBound = UserKeyPrefixBound{}
587 1 : } else if w.comparer.Compare(key.UserKey[:eval.kcmp.PrefixLen], w.opts.RequiredInPlaceValueBound.Lower) >= 0 {
588 1 : // Don't write to value block if the key is within the bounds.
589 1 : return eval, nil
590 1 : }
591 : }
592 1 : eval.writeToValueBlock = true
593 1 : return eval, nil
594 : }
595 :
596 : var compressedBlockPool = sync.Pool{
597 1 : New: func() interface{} {
598 1 : return new(compressedBlock)
599 1 : },
600 : }
601 :
602 : type compressedBlock struct {
603 : physical block.PhysicalBlock
604 : blockBuf blockBuf
605 : }
606 :
607 1 : func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) error {
608 1 : serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
609 1 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
610 1 : // Compute the separator that will be written to the index block alongside
611 1 : // this data block's end offset. It is the separator between the last key in
612 1 : // the finished block and the [nextKey] that was excluded from the block.
613 1 : w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
614 1 : if err := w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf); err != nil {
615 0 : return err
616 0 : }
617 1 : w.dataBlock.Reset()
618 1 : w.pendingDataBlockSize = 0
619 1 : return nil
620 : }
621 :
622 : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
623 : // blocks if the number of deletions in the data block exceeds a threshold or
624 : // the deletion size exceeds a threshold. It should be called after the
625 : // data block has been finished.
626 : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
627 1 : func (w *RawColumnWriter) maybeIncrementTombstoneDenseBlocks(uncompressedLen int) {
628 1 : minSize := w.opts.DeletionSizeRatioThreshold * float32(uncompressedLen)
629 1 : if w.dataBlock.numDeletions > w.opts.NumDeletionsThreshold || float32(w.dataBlock.deletionSize) > minSize {
630 1 : w.props.NumTombstoneDenseBlocks++
631 1 : }
632 1 : w.dataBlock.numDeletions = 0
633 1 : w.dataBlock.deletionSize = 0
634 : }
635 :
636 : // enqueueDataBlock compresses and checksums the provided data block and sends
637 : // it to the write queue to be asynchronously written to the underlying storage.
638 : // It also adds the block's index block separator to the pending index block,
639 : // possibly triggering the index block to be finished and buffered.
640 : func (w *RawColumnWriter) enqueueDataBlock(
641 : serializedBlock []byte, lastKey base.InternalKey, separator []byte,
642 1 : ) error {
643 1 : w.lastKeyBuf = append(w.lastKeyBuf[:0], lastKey.UserKey...)
644 1 : w.meta.SetLargestPointKey(base.InternalKey{
645 1 : UserKey: w.lastKeyBuf,
646 1 : Trailer: lastKey.Trailer,
647 1 : })
648 1 :
649 1 : if invariants.Enabled {
650 1 : var dec colblk.DataBlockDecoder
651 1 : dec.Init(w.opts.KeySchema, serializedBlock)
652 1 : if err := dec.Validate(w.comparer, w.opts.KeySchema); err != nil {
653 0 : panic(err)
654 : }
655 : }
656 :
657 : // Serialize the data block, compress it and send it to the write queue.
658 1 : cb := compressedBlockPool.Get().(*compressedBlock)
659 1 : cb.blockBuf.checksummer.Type = w.opts.Checksum
660 1 : cb.physical = block.CompressAndChecksum(
661 1 : &cb.blockBuf.dataBuf,
662 1 : serializedBlock,
663 1 : w.opts.Compression,
664 1 : &cb.blockBuf.checksummer,
665 1 : )
666 1 : if !cb.physical.IsCompressed() {
667 1 : // If the block isn't compressed, cb.physical's underlying data points
668 1 : // directly into a buffer owned by w.dataBlock. Clone it before passing
669 1 : // it to the write queue to be asynchronously written to disk.
670 1 : // TODO(jackson): Should we try to avoid this clone by tracking the
671 1 : // lifetime of the DataBlockWriters?
672 1 : cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
673 1 : }
674 1 : return w.enqueuePhysicalBlock(cb, separator)
675 : }
676 :
677 1 : func (w *RawColumnWriter) enqueuePhysicalBlock(cb *compressedBlock, separator []byte) error {
678 1 : dataBlockHandle := block.Handle{
679 1 : Offset: w.queuedDataSize,
680 1 : Length: uint64(cb.physical.LengthWithoutTrailer()),
681 1 : }
682 1 : w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
683 1 : w.writeQueue.ch <- cb
684 1 :
685 1 : var err error
686 1 : w.blockPropsEncoder.resetProps()
687 1 : for i := range w.blockPropCollectors {
688 1 : scratch := w.blockPropsEncoder.getScratchForProp()
689 1 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
690 0 : return err
691 0 : }
692 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
693 : }
694 1 : dataBlockProps := w.blockPropsEncoder.unsafeProps()
695 1 :
696 1 : // Add the separator to the index block. This might trigger a flush of the
697 1 : // index block too.
698 1 : i := w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
699 1 : sizeWithEntry := w.indexBlock.Size()
700 1 : if shouldFlushWithoutLatestKV(sizeWithEntry, w.indexBlockSize, i, &w.indexFlush) {
701 1 : // NB: finishIndexBlock will use blockPropsEncoder, so we must clone the
702 1 : // data block's props first.
703 1 : dataBlockProps = slices.Clone(dataBlockProps)
704 1 :
705 1 : if err = w.finishIndexBlock(w.indexBlock.Rows() - 1); err != nil {
706 0 : return err
707 0 : }
708 : // finishIndexBlock reset the index block builder, and we can
709 : // add the block handle to this new index block.
710 1 : _ = w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
711 1 : w.indexBlockSize = w.indexBlock.Size()
712 1 : } else {
713 1 : w.indexBlockSize = sizeWithEntry
714 1 : }
715 : // Incorporate the finished data block's property into the index block, now
716 : // that we've flushed the index block without the new separator if
717 : // necessary.
718 1 : for i := range w.blockPropCollectors {
719 1 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
720 1 : }
721 1 : return nil
722 : }
723 :
724 : // finishIndexBlock finishes the currently pending index block with the first
725 : // [rows] rows. In practice, [rows] is always w.indexBlock.Rows() or
726 : // w.indexBlock.Rows()-1.
727 : //
728 : // The finished index block is buffered until the writer is closed.
729 1 : func (w *RawColumnWriter) finishIndexBlock(rows int) error {
730 1 : defer w.indexBlock.Reset()
731 1 : w.blockPropsEncoder.resetProps()
732 1 : for i := range w.blockPropCollectors {
733 1 : scratch := w.blockPropsEncoder.getScratchForProp()
734 1 : var err error
735 1 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
736 0 : return err
737 0 : }
738 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
739 : }
740 1 : indexProps := w.blockPropsEncoder.props()
741 1 : bib := bufferedIndexBlock{nEntries: rows, properties: indexProps}
742 1 :
743 1 : // Copy the last (greatest) separator key in the index block into bib.sep.
744 1 : // It'll be the separator on the entry in the top-level index block.
745 1 : //
746 1 : // TODO(jackson): bib.sep.Trailer is unused within the columnar-block
747 1 : // sstable writer. Its existence is a code artifact of reuse of the
748 1 : // bufferedIndexBlock type between colblk and rowblk writers. This can be
749 1 : // cleaned up.
750 1 : bib.sep.Trailer = base.MakeTrailer(base.SeqNumMax, base.InternalKeyKindSeparator)
751 1 : w.indexBuffering.sepAlloc, bib.sep.UserKey = w.indexBuffering.sepAlloc.Copy(
752 1 : w.indexBlock.UnsafeSeparator(rows - 1))
753 1 :
754 1 : // Finish the index block and copy it so that w.indexBlock may be reused.
755 1 : block := w.indexBlock.Finish(rows)
756 1 : if len(w.indexBuffering.blockAlloc) < len(block) {
757 1 : // Allocate enough bytes for approximately 16 index blocks.
758 1 : w.indexBuffering.blockAlloc = make([]byte, len(block)*16)
759 1 : }
760 1 : n := copy(w.indexBuffering.blockAlloc, block)
761 1 : bib.block = w.indexBuffering.blockAlloc[:n:n]
762 1 : w.indexBuffering.blockAlloc = w.indexBuffering.blockAlloc[n:]
763 1 :
764 1 : w.indexBuffering.partitions = append(w.indexBuffering.partitions, bib)
765 1 : return nil
766 : }
767 :
768 : // flushBufferedIndexBlocks writes all index blocks, including the top-level
769 : // index block if necessary, to the underlying writable. It returns the block
770 : // handle of the top index (either the only index block or the top-level index
771 : // if two-level).
772 1 : func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, err error) {
773 1 : // If there's a currently-pending index block, finish it.
774 1 : if w.indexBlock.Rows() > 0 || len(w.indexBuffering.partitions) == 0 {
775 1 : w.finishIndexBlock(w.indexBlock.Rows())
776 1 : }
777 : // We've buffered all the index blocks. Typically there's just one index
778 : // block, in which case we're writing a "single-level" index. If we're
779 : // writing a large file or the index separators happen to be excessively
780 : // long, we may have several index blocks and need to construct a
781 : // "two-level" index structure.
782 1 : switch len(w.indexBuffering.partitions) {
783 0 : case 0:
784 0 : // This is impossible because we'll flush the index block immediately
785 0 : // above this switch statement if there are no buffered partitions
786 0 : // (regardless of whether there are data block handles in the index
787 0 : // block).
788 0 : panic("unreachable")
789 1 : case 1:
790 1 : // Single-level index.
791 1 : rootIndex, err = w.layout.WriteIndexBlock(w.indexBuffering.partitions[0].block)
792 1 : if err != nil {
793 0 : return rootIndex, err
794 0 : }
795 1 : w.props.IndexSize = rootIndex.Length + block.TrailerLen
796 1 : w.props.NumDataBlocks = uint64(w.indexBuffering.partitions[0].nEntries)
797 1 : w.props.IndexType = binarySearchIndex
798 1 : default:
799 1 : // Two-level index.
800 1 : for _, part := range w.indexBuffering.partitions {
801 1 : bh, err := w.layout.WriteIndexBlock(part.block)
802 1 : if err != nil {
803 0 : return block.Handle{}, err
804 0 : }
805 1 : w.props.IndexSize += bh.Length + block.TrailerLen
806 1 : w.props.NumDataBlocks += uint64(w.indexBuffering.partitions[0].nEntries)
807 1 : w.topLevelIndexBlock.AddBlockHandle(part.sep.UserKey, bh, part.properties)
808 : }
809 1 : rootIndex, err = w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish(w.topLevelIndexBlock.Rows()))
810 1 : if err != nil {
811 0 : return block.Handle{}, err
812 0 : }
813 1 : w.props.IndexSize += rootIndex.Length + block.TrailerLen
814 1 : w.props.IndexType = twoLevelIndex
815 1 : w.props.IndexPartitions = uint64(len(w.indexBuffering.partitions))
816 : }
817 1 : return rootIndex, nil
818 : }
819 :
820 : // drainWriteQueue runs in its own goroutine and is responsible for writing
821 : // finished, compressed data blocks to the writable. It reads from w.writeQueue
822 : // until the channel is closed. All data blocks are written by this goroutine.
823 : // Other blocks are written directly by the client goroutine. See Close.
824 1 : func (w *RawColumnWriter) drainWriteQueue() {
825 1 : defer w.writeQueue.wg.Done()
826 1 : for cb := range w.writeQueue.ch {
827 1 : if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
828 0 : w.writeQueue.err = err
829 0 : }
830 1 : cb.blockBuf.clear()
831 1 : cb.physical = block.PhysicalBlock{}
832 1 : compressedBlockPool.Put(cb)
833 : }
834 : }
835 :
836 1 : func (w *RawColumnWriter) Close() (err error) {
837 1 : defer func() {
838 1 : if w.valueBlock != nil {
839 1 : w.valueBlock.Release()
840 1 : // Defensive code in case Close gets called again. We don't want to put
841 1 : // the same object to a sync.Pool.
842 1 : w.valueBlock = nil
843 1 : }
844 1 : w.layout.Abort()
845 1 : // Record any error in the writer (so we can exit early if Close is called
846 1 : // again).
847 1 : if err != nil {
848 1 : w.err = err
849 1 : }
850 : }()
851 1 : if w.layout.writable == nil {
852 1 : return w.err
853 1 : }
854 :
855 : // Finish the last data block and send it to the write queue if it contains
856 : // any pending KVs.
857 1 : if rows := w.dataBlock.Rows(); rows > 0 {
858 1 : serializedBlock, lastKey := w.dataBlock.Finish(rows, w.pendingDataBlockSize)
859 1 : w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], lastKey.UserKey)
860 1 : w.err = errors.CombineErrors(w.err, w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf))
861 1 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
862 1 : }
863 : // Close the write queue channel so that the goroutine responsible for
864 : // writing data blocks to disk knows to exit. Any subsequent blocks (eg,
865 : // index, metadata, range key, etc) will be written by the goroutine that
866 : // called Close.
867 1 : close(w.writeQueue.ch)
868 1 : w.writeQueue.wg.Wait()
869 1 : // If the write queue encountered any errors while writing out data blocks,
870 1 : // it's stored in w.writeQueue.err.
871 1 : w.err = firstError(w.err, w.writeQueue.err)
872 1 : if w.err != nil {
873 1 : return w.err
874 1 : }
875 :
876 : // INVARIANT: w.queuedDataSize == w.layout.offset.
877 : // All data blocks have been written to disk. The queuedDataSize is the
878 : // cumulative size of all the data blocks we've sent to the write queue. Now
879 : // that they've all been flushed, queuedDataSize should match w.layout's
880 : // offset.
881 1 : if w.queuedDataSize != w.layout.offset {
882 0 : panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
883 0 : w.queuedDataSize, w.layout.offset))
884 : }
885 1 : w.props.DataSize = w.layout.offset
886 1 : if _, err = w.flushBufferedIndexBlocks(); err != nil {
887 0 : return err
888 0 : }
889 :
890 : // Write the filter block.
891 1 : if w.filterBlock != nil {
892 1 : bh, err := w.layout.WriteFilterBlock(w.filterBlock)
893 1 : if err != nil {
894 0 : return err
895 0 : }
896 1 : w.props.FilterPolicyName = w.filterBlock.policyName()
897 1 : w.props.FilterSize = bh.Length
898 : }
899 :
900 : // Write the range deletion block if non-empty.
901 1 : if w.rangeDelBlock.KeyCount() > 0 {
902 1 : w.props.NumRangeDeletions = uint64(w.rangeDelBlock.KeyCount())
903 1 : sm, la := w.rangeDelBlock.UnsafeBoundaryKeys()
904 1 : w.meta.SetSmallestRangeDelKey(sm)
905 1 : w.meta.SetLargestRangeDelKey(la)
906 1 : if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
907 0 : return err
908 0 : }
909 : }
910 :
911 : // Write the range key block if non-empty.
912 1 : if w.rangeKeyBlock.KeyCount() > 0 {
913 1 : sm, la := w.rangeKeyBlock.UnsafeBoundaryKeys()
914 1 : w.meta.SetSmallestRangeKey(sm)
915 1 : w.meta.SetLargestRangeKey(la)
916 1 : if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
917 0 : return err
918 0 : }
919 : }
920 :
921 : // Write out the value block.
922 1 : if w.valueBlock != nil {
923 1 : _, vbStats, err := w.valueBlock.Finish(&w.layout, w.layout.offset)
924 1 : if err != nil {
925 0 : return err
926 0 : }
927 1 : w.props.NumValueBlocks = vbStats.NumValueBlocks
928 1 : w.props.NumValuesInValueBlocks = vbStats.NumValuesInValueBlocks
929 1 : w.props.ValueBlocksSize = vbStats.ValueBlocksAndIndexSize
930 : }
931 :
932 : // Write the properties block.
933 1 : {
934 1 : // Finish and record the prop collectors if props are not yet recorded.
935 1 : // Pre-computed props might have been copied by specialized sst creators
936 1 : // like suffix replacer.
937 1 : if len(w.props.UserProperties) == 0 {
938 1 : userProps := make(map[string]string)
939 1 : for i := range w.blockPropCollectors {
940 1 : scratch := w.blockPropsEncoder.getScratchForProp()
941 1 : // Place the shortID in the first byte.
942 1 : scratch = append(scratch, byte(i))
943 1 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
944 1 : if err != nil {
945 0 : return err
946 0 : }
947 1 : var prop string
948 1 : if len(buf) > 0 {
949 1 : prop = string(buf)
950 1 : }
951 : // NB: The property is populated in the map even if it is the
952 : // empty string, since the presence in the map is what indicates
953 : // that the block property collector was used when writing.
954 1 : userProps[w.blockPropCollectors[i].Name()] = prop
955 : }
956 1 : if len(userProps) > 0 {
957 1 : w.props.UserProperties = userProps
958 1 : }
959 : }
960 :
961 1 : var raw rowblk.Writer
962 1 : // The restart interval is set to infinity because the properties block
963 1 : // is always read sequentially and cached in a heap located object. This
964 1 : // reduces table size without a significant impact on performance.
965 1 : raw.RestartInterval = propertiesBlockRestartInterval
966 1 : w.props.CompressionOptions = rocksDBCompressionOptions
967 1 : w.props.save(w.opts.TableFormat, &raw)
968 1 : if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
969 0 : return err
970 0 : }
971 : }
972 :
973 : // Write the table footer.
974 1 : w.meta.Size, err = w.layout.Finish()
975 1 : if err != nil {
976 0 : return err
977 0 : }
978 1 : w.meta.Properties = w.props
979 1 : // Release any held memory and make any future calls error.
980 1 : *w = RawColumnWriter{meta: w.meta, err: errWriterClosed}
981 1 : return nil
982 : }
983 :
984 : // rewriteSuffixes implements RawWriter.
985 : func (w *RawColumnWriter) rewriteSuffixes(
986 : r *Reader, sstBytes []byte, wo WriterOptions, from, to []byte, concurrency int,
987 1 : ) error {
988 1 : for _, c := range w.blockPropCollectors {
989 1 : if !c.SupportsSuffixReplacement() {
990 0 : return errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
991 0 : }
992 : }
993 1 : l, err := r.Layout()
994 1 : if err != nil {
995 0 : return errors.Wrap(err, "reading layout")
996 0 : }
997 : // Copy data blocks in parallel, rewriting suffixes as we go.
998 1 : blocks, err := rewriteDataBlocksInParallel(r, sstBytes, wo, l.Data, from, to, concurrency, func() blockRewriter {
999 1 : return colblk.NewDataBlockRewriter(wo.KeySchema, w.comparer)
1000 1 : })
1001 1 : if err != nil {
1002 1 : return errors.Wrap(err, "rewriting data blocks")
1003 1 : }
1004 :
1005 : // oldShortIDs maps the shortID for the block property collector in the old
1006 : // blocks to the shortID in the new blocks. Initialized once for the sstable.
1007 1 : oldShortIDs, n, err := getShortIDs(r, w.blockPropCollectors)
1008 1 : if err != nil {
1009 0 : return errors.Wrap(err, "getting short IDs")
1010 0 : }
1011 1 : oldProps := make([][]byte, len(w.blockPropCollectors))
1012 1 : for i := range blocks {
1013 1 : cb := compressedBlockPool.Get().(*compressedBlock)
1014 1 : cb.physical = blocks[i].physical
1015 1 :
1016 1 : // Load any previous values for our prop collectors into oldProps.
1017 1 : for i := range oldProps {
1018 1 : oldProps[i] = nil
1019 1 : }
1020 1 : decoder := makeBlockPropertiesDecoder(n, l.Data[i].Props)
1021 1 : for !decoder.Done() {
1022 1 : id, val, err := decoder.Next()
1023 1 : if err != nil {
1024 0 : return err
1025 0 : }
1026 1 : if oldShortIDs[id].IsValid() {
1027 1 : oldProps[oldShortIDs[id]] = val
1028 1 : }
1029 : }
1030 1 : for i, p := range w.blockPropCollectors {
1031 1 : if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil {
1032 0 : return err
1033 0 : }
1034 : }
1035 1 : var separator []byte
1036 1 : if i+1 < len(blocks) {
1037 1 : w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], blocks[i].end.UserKey, blocks[i+1].start.UserKey)
1038 1 : separator = w.separatorBuf
1039 1 : } else {
1040 1 : w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], blocks[i].end.UserKey)
1041 1 : separator = w.separatorBuf
1042 1 : }
1043 1 : w.enqueuePhysicalBlock(cb, separator)
1044 : }
1045 :
1046 1 : if len(blocks) > 0 {
1047 1 : w.meta.updateSeqNum(blocks[0].start.SeqNum())
1048 1 : w.props.NumEntries = r.Properties.NumEntries
1049 1 : w.props.RawKeySize = r.Properties.RawKeySize
1050 1 : w.props.RawValueSize = r.Properties.RawValueSize
1051 1 : w.meta.SetSmallestPointKey(blocks[0].start)
1052 1 : w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
1053 1 : }
1054 :
1055 : // Copy range key block, replacing suffixes if it exists.
1056 1 : if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
1057 0 : return errors.Wrap(err, "rewriting range key blocks")
1058 0 : }
1059 : // Copy over the filter block if it exists.
1060 1 : if w.filterBlock != nil {
1061 1 : if filterBlockBH, ok := l.FilterByName(w.filterBlock.metaName()); ok {
1062 1 : filterBlock, _, err := readBlockBuf(sstBytes, filterBlockBH, r.checksumType, nil)
1063 1 : if err != nil {
1064 0 : return errors.Wrap(err, "reading filter")
1065 0 : }
1066 1 : w.filterBlock = copyFilterWriter{
1067 1 : origPolicyName: w.filterBlock.policyName(),
1068 1 : origMetaName: w.filterBlock.metaName(),
1069 1 : data: filterBlock,
1070 1 : }
1071 : }
1072 : }
1073 1 : return nil
1074 : }
1075 :
1076 : func shouldFlushWithoutLatestKV(
1077 : sizeWithKV int, sizeWithoutKV int, entryCountWithoutKV int, flushGovernor *block.FlushGovernor,
1078 1 : ) bool {
1079 1 : if entryCountWithoutKV == 0 {
1080 1 : return false
1081 1 : }
1082 1 : if sizeWithoutKV < flushGovernor.LowWatermark() {
1083 1 : // Fast path when the block is too small to flush.
1084 1 : return false
1085 1 : }
1086 1 : return flushGovernor.ShouldFlush(sizeWithoutKV, sizeWithKV)
1087 : }
1088 :
1089 : // copyDataBlocks adds a range of blocks to the table as-is. These blocks could be
1090 : // compressed. It's specifically used by the sstable copier that can copy parts
1091 : // of an sstable to a new sstable, using CopySpan().
1092 : func (w *RawColumnWriter) copyDataBlocks(
1093 : ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
1094 1 : ) error {
1095 1 : const readSizeTarget = 256 << 10
1096 1 : readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
1097 1 : if firstBlockIdx > lastBlockIdx {
1098 0 : panic("pebble: readAndFlushBlocks called with invalid block range")
1099 : }
1100 : // We need to flush blocks[firstBlockIdx:lastBlockIdx+1] into the write queue.
1101 : // We do this by issuing one big read from the read handle into the buffer, and
1102 : // then enqueueing the writing of those blocks one-by-one.
1103 : //
1104 : // TODO(bilal): Consider refactoring the write queue to support writing multiple
1105 : // blocks in one request.
1106 1 : lastBH := blocks[lastBlockIdx].bh
1107 1 : blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
1108 1 : // We need to create a new buffer for each read, as w.enqueuePhysicalBlock passes
1109 1 : // a pointer to the buffer to the write queue.
1110 1 : buf := make([]byte, 0, blocksToReadLen)
1111 1 : if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
1112 0 : return err
1113 0 : }
1114 1 : for i := firstBlockIdx; i <= lastBlockIdx; i++ {
1115 1 : offsetDiff := blocks[i].bh.Offset - blocks[firstBlockIdx].bh.Offset
1116 1 : blockBuf := buf[offsetDiff : offsetDiff+blocks[i].bh.Length+block.TrailerLen]
1117 1 : cb := compressedBlockPool.Get().(*compressedBlock)
1118 1 : cb.physical = block.NewPhysicalBlock(blockBuf)
1119 1 : if err := w.enqueuePhysicalBlock(cb, blocks[i].sep); err != nil {
1120 0 : return err
1121 0 : }
1122 : }
1123 1 : return nil
1124 : }
1125 : // Iterate through blocks until we have enough to fill readSizeTarget. When we have more than
1126 : // one block in blocksToRead and adding the next block would exceed the target buffer capacity,
1127 : // we read and flush existing blocks in blocksToRead. This allows us to read as many
1128 : // blocks in one IO request as possible, while still utilizing the write queue in this
1129 : // writer.
1130 1 : lastBlockOffset := uint64(0)
1131 1 : for i := 0; i < len(blocks); {
1132 1 : if blocks[i].bh.Offset < lastBlockOffset {
1133 0 : panic("pebble: copyDataBlocks called with blocks out of order")
1134 : }
1135 1 : start := i
1136 1 : // Note the i++ in the initializing condition; this means we will always flush at least
1137 1 : // one block.
1138 1 : for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(readSizeTarget); i++ {
1139 1 : }
1140 : // i points to one index past the last block we want to read.
1141 1 : if err := readAndFlushBlocks(start, i-1); err != nil {
1142 0 : return err
1143 0 : }
1144 : }
1145 1 : return nil
1146 : }
1147 :
1148 : // addDataBlock adds a raw uncompressed data block to the table as-is. It's specifically used
1149 : // by the sstable copier that can copy parts of an sstable to a new sstable,
1150 : // using CopySpan().
1151 1 : func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
1152 1 : // Serialize the data block, compress it and send it to the write queue.
1153 1 : cb := compressedBlockPool.Get().(*compressedBlock)
1154 1 : cb.blockBuf.checksummer.Type = w.opts.Checksum
1155 1 : cb.physical = block.CompressAndChecksum(
1156 1 : &cb.blockBuf.dataBuf,
1157 1 : b,
1158 1 : w.opts.Compression,
1159 1 : &cb.blockBuf.checksummer,
1160 1 : )
1161 1 : if !cb.physical.IsCompressed() {
1162 1 : // If the block isn't compressed, cb.physical's underlying data points
1163 1 : // directly into a buffer owned by w.dataBlock. Clone it before passing
1164 1 : // it to the write queue to be asynchronously written to disk.
1165 1 : // TODO(jackson): Should we try to avoid this clone by tracking the
1166 1 : // lifetime of the DataBlockWriters?
1167 1 : cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
1168 1 : }
1169 1 : if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
1170 0 : return err
1171 0 : }
1172 1 : return nil
1173 : }
1174 :
1175 : // copyFilter copies the specified filter to the table. It's specifically used
1176 : // by the sstable copier that can copy parts of an sstable to a new sstable,
1177 : // using CopySpan().
1178 0 : func (w *RawColumnWriter) copyFilter(filter []byte, filterName string) error {
1179 0 : if w.filterBlock != nil && filterName != w.filterBlock.policyName() {
1180 0 : return errors.New("mismatched filters")
1181 0 : }
1182 0 : w.filterBlock = copyFilterWriter{
1183 0 : origPolicyName: w.filterBlock.policyName(), origMetaName: w.filterBlock.metaName(), data: filter,
1184 0 : }
1185 0 : return nil
1186 : }
1187 :
1188 : // copyProperties copies properties from the specified props, and resets others
1189 : // to prepare for copying data blocks from another sstable, using the copy/addDataBlock(s)
1190 : // methods above. It's specifically used by the sstable copier that can copy parts of an
1191 : // sstable to a new sstable, using CopySpan().
1192 1 : func (w *RawColumnWriter) copyProperties(props Properties) {
1193 1 : w.props = props
1194 1 : // Remove all user properties to disable block properties, which we do not
1195 1 : // calculate for CopySpan.
1196 1 : w.props.UserProperties = nil
1197 1 : // Reset props that we'll re-derive as we build our own index.
1198 1 : w.props.IndexPartitions = 0
1199 1 : w.props.TopLevelIndexSize = 0
1200 1 : w.props.IndexSize = 0
1201 1 : w.props.IndexType = 0
1202 1 : }
|