Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "math"
13 : "slices"
14 : "sync"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/bytealloc"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/internal/keyspan"
21 : "github.com/cockroachdb/pebble/objstorage"
22 : "github.com/cockroachdb/pebble/sstable/block"
23 : "github.com/cockroachdb/pebble/sstable/colblk"
24 : "github.com/cockroachdb/pebble/sstable/rowblk"
25 : "github.com/cockroachdb/pebble/sstable/valblk"
26 : )
27 :
28 : // RawColumnWriter is a sstable RawWriter that writes sstables with
29 : // column-oriented blocks. All table formats TableFormatPebblev5 and later write
30 : // column-oriented blocks and use RawColumnWriter.
31 : type RawColumnWriter struct {
32 : comparer *base.Comparer
33 : meta WriterMetadata
34 : opts WriterOptions
35 : err error
36 :
37 : dataFlush block.FlushGovernor
38 : indexFlush block.FlushGovernor
39 : blockPropCollectors []BlockPropertyCollector
40 : blockPropsEncoder blockPropertiesEncoder
41 : obsoleteCollector obsoleteKeyBlockPropertyCollector
42 : props Properties
43 : // block writers buffering unflushed data.
44 : dataBlock struct {
45 : colblk.DataBlockEncoder
46 : // numDeletions stores the count of point tombstones in this data block.
47 : // It's used to determine if this data block is considered
48 : // tombstone-dense for the purposes of compaction.
49 : numDeletions int
50 : // deletionSize stores the raw size of point tombstones in this data
51 : // block. It's used to determine if this data block is considered
52 : // tombstone-dense for the purposes of compaction.
53 : deletionSize int
54 : }
55 : indexBlock colblk.IndexBlockWriter
56 : topLevelIndexBlock colblk.IndexBlockWriter
57 : rangeDelBlock colblk.KeyspanBlockWriter
58 : rangeKeyBlock colblk.KeyspanBlockWriter
59 : valueBlock *valblk.Writer // nil iff WriterOptions.DisableValueBlocks=true
60 : // filter accumulates the filter block. If populated, the filter ingests
61 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
62 : // nil, or the full keys otherwise.
63 : filterBlock filterWriter
64 : prevPointKey struct {
65 : trailer base.InternalKeyTrailer
66 : isObsolete bool
67 : }
68 : pendingDataBlockSize int
69 : indexBlockSize int
70 : queuedDataSize uint64
71 :
72 : // indexBuffering holds finished index blocks as they're completed while
73 : // building the sstable. If an index block grows sufficiently large
74 : // (IndexBlockSize) while an sstable is still being constructed, the sstable
75 : // writer will create a two-level index structure. As index blocks are
76 : // completed, they're finished and buffered in-memory until the table is
77 : // finished. When the table is finished, the buffered index blocks are
78 : // flushed in order after all the data blocks, and the top-level index block
79 : // is constructed to point to all the individual index blocks.
80 : indexBuffering struct {
81 : // partitions holds all the completed index blocks.
82 : partitions []bufferedIndexBlock
83 : // blockAlloc is used to bulk-allocate byte slices used to store index
84 : // blocks in partitions. These live until the sstable is finished.
85 : blockAlloc []byte
86 : // sepAlloc is used to bulk-allocate index block separator slices stored
87 : // in partitions. These live until the sstable is finished.
88 : sepAlloc bytealloc.A
89 : }
90 :
91 : writeQueue struct {
92 : wg sync.WaitGroup
93 : ch chan *compressedBlock
94 : err error
95 : }
96 : layout layoutWriter
97 :
98 : separatorBuf []byte
99 : tmp [blockHandleLikelyMaxLen]byte
100 : previousUserKey invariants.Value[[]byte]
101 : disableKeyOrderChecks bool
102 : }
103 :
104 : // Assert that *RawColumnWriter implements RawWriter.
105 : var _ RawWriter = (*RawColumnWriter)(nil)
106 :
107 1 : func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
108 1 : if writable == nil {
109 0 : panic("pebble: nil writable")
110 : }
111 1 : if !o.TableFormat.BlockColumnar() {
112 0 : panic(errors.AssertionFailedf("newColumnarWriter cannot create sstables with %s format", o.TableFormat))
113 : }
114 1 : o = o.ensureDefaults()
115 1 : w := &RawColumnWriter{
116 1 : comparer: o.Comparer,
117 1 : meta: WriterMetadata{
118 1 : SmallestSeqNum: math.MaxUint64,
119 1 : },
120 1 : opts: o,
121 1 : layout: makeLayoutWriter(writable, o),
122 1 : disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
123 1 : }
124 1 : w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
125 1 : w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
126 1 : w.dataBlock.Init(o.KeySchema)
127 1 : w.indexBlock.Init()
128 1 : w.topLevelIndexBlock.Init()
129 1 : w.rangeDelBlock.Init(w.comparer.Equal)
130 1 : w.rangeKeyBlock.Init(w.comparer.Equal)
131 1 : if !o.DisableValueBlocks {
132 1 : w.valueBlock = valblk.NewWriter(
133 1 : block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
134 1 : w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
135 : }
136 1 : if o.FilterPolicy != nil {
137 1 : switch o.FilterType {
138 1 : case TableFilter:
139 1 : w.filterBlock = newTableFilterWriter(o.FilterPolicy)
140 0 : default:
141 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
142 : }
143 : }
144 :
145 1 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
146 1 : if !o.disableObsoleteCollector {
147 1 : numBlockPropertyCollectors++
148 1 : }
149 1 : if numBlockPropertyCollectors > maxPropertyCollectors {
150 0 : panic(errors.New("pebble: too many block property collectors"))
151 : }
152 1 : w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
153 1 : for _, constructFn := range o.BlockPropertyCollectors {
154 1 : w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
155 1 : }
156 1 : if !o.disableObsoleteCollector {
157 1 : w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
158 1 : }
159 1 : var buf bytes.Buffer
160 1 : buf.WriteString("[")
161 1 : for i := range w.blockPropCollectors {
162 1 : if i > 0 {
163 1 : buf.WriteString(",")
164 1 : }
165 1 : buf.WriteString(w.blockPropCollectors[i].Name())
166 : }
167 1 : buf.WriteString("]")
168 1 : w.props.PropertyCollectorNames = buf.String()
169 1 :
170 1 : w.props.ComparerName = o.Comparer.Name
171 1 : w.props.CompressionName = o.Compression.String()
172 1 : w.props.KeySchemaName = o.KeySchema.Name
173 1 : w.props.MergerName = o.MergerName
174 1 :
175 1 : w.writeQueue.ch = make(chan *compressedBlock)
176 1 : w.writeQueue.wg.Add(1)
177 1 : go w.drainWriteQueue()
178 1 : return w
179 : }
180 :
181 : // Error returns the current accumulated error if any.
182 1 : func (w *RawColumnWriter) Error() error {
183 1 : return w.err
184 1 : }
185 :
186 : // EstimatedSize returns the estimated size of the sstable being written if
187 : // a call to Close() was made without adding additional keys.
188 1 : func (w *RawColumnWriter) EstimatedSize() uint64 {
189 1 : sz := rocksDBFooterLen + w.queuedDataSize
190 1 : // TODO(jackson): Avoid iterating over partitions by incrementally
191 1 : // maintaining the size contribution of all buffered partitions.
192 1 : for _, bib := range w.indexBuffering.partitions {
193 1 : // We include the separator user key to account for its bytes in the
194 1 : // top-level index block.
195 1 : //
196 1 : // TODO(jackson): We could incrementally build the top-level index block
197 1 : // and produce an exact calculation of the current top-level index
198 1 : // block's size.
199 1 : sz += uint64(len(bib.block) + block.TrailerLen + len(bib.sep.UserKey))
200 1 : }
201 1 : if w.rangeDelBlock.KeyCount() > 0 {
202 1 : sz += uint64(w.rangeDelBlock.Size())
203 1 : }
204 1 : if w.rangeKeyBlock.KeyCount() > 0 {
205 1 : sz += uint64(w.rangeKeyBlock.Size())
206 1 : }
207 1 : if w.valueBlock != nil {
208 1 : sz += w.valueBlock.Size()
209 1 : }
210 : // TODO(jackson): Include an estimate of the properties, filter and meta
211 : // index blocks sizes.
212 1 : return sz
213 : }
214 :
215 : // ComparePrev compares the provided user to the last point key written to the
216 : // writer. The returned value is equivalent to Compare(key, prevKey) where
217 : // prevKey is the last point key written to the writer.
218 : //
219 : // If no key has been written yet, ComparePrev returns +1.
220 : //
221 : // Must not be called after Writer is closed.
222 1 : func (w *RawColumnWriter) ComparePrev(k []byte) int {
223 1 : if w == nil || w.dataBlock.Rows() == 0 {
224 1 : return +1
225 1 : }
226 1 : return int(w.dataBlock.KeyWriter.ComparePrev(k).UserKeyComparison)
227 : }
228 :
229 : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
230 : // be used internally by Pebble.
231 : func (w *RawColumnWriter) SetSnapshotPinnedProperties(
232 : pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
233 1 : ) {
234 1 : w.props.SnapshotPinnedKeys = pinnedKeyCount
235 1 : w.props.SnapshotPinnedKeySize = pinnedKeySize
236 1 : w.props.SnapshotPinnedValueSize = pinnedValueSize
237 1 : }
238 :
239 : // Metadata returns the metadata for the finished sstable. Only valid to call
240 : // after the sstable has been finished.
241 1 : func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
242 1 : if !w.layout.IsFinished() {
243 0 : return nil, errors.New("pebble: writer is not closed")
244 0 : }
245 1 : return &w.meta, nil
246 : }
247 :
248 : // EncodeSpan encodes the keys in the given span. The span can contain either
249 : // only RANGEDEL keys or only range keys.
250 1 : func (w *RawColumnWriter) EncodeSpan(span keyspan.Span) error {
251 1 : if span.Empty() {
252 1 : return nil
253 1 : }
254 1 : for _, k := range span.Keys {
255 1 : w.meta.updateSeqNum(k.SeqNum())
256 1 : }
257 :
258 1 : blockWriter := &w.rangeKeyBlock
259 1 : if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
260 1 : blockWriter = &w.rangeDelBlock
261 1 : // Update range delete properties.
262 1 : // NB: These properties are computed differently than the rowblk sstable
263 1 : // writer because this writer does not flatten them into row key-value
264 1 : // pairs.
265 1 : w.props.RawKeySize += uint64(len(span.Start) + len(span.End))
266 1 : count := uint64(len(span.Keys))
267 1 : w.props.NumEntries += count
268 1 : w.props.NumDeletions += count
269 1 : w.props.NumRangeDeletions += count
270 1 : } else {
271 1 : // Update range key properties.
272 1 : // NB: These properties are computed differently than the rowblk sstable
273 1 : // writer because this writer does not flatten them into row key-value
274 1 : // pairs.
275 1 : w.props.RawRangeKeyKeySize += uint64(len(span.Start) + len(span.End))
276 1 : for _, k := range span.Keys {
277 1 : w.props.RawRangeKeyValueSize += uint64(len(k.Value))
278 1 : switch k.Kind() {
279 1 : case base.InternalKeyKindRangeKeyDelete:
280 1 : w.props.NumRangeKeyDels++
281 1 : case base.InternalKeyKindRangeKeySet:
282 1 : w.props.NumRangeKeySets++
283 1 : case base.InternalKeyKindRangeKeyUnset:
284 1 : w.props.NumRangeKeyUnsets++
285 0 : default:
286 0 : panic(errors.Errorf("pebble: invalid range key type: %s", k.Kind()))
287 : }
288 : }
289 1 : for i := range w.blockPropCollectors {
290 1 : if err := w.blockPropCollectors[i].AddRangeKeys(span); err != nil {
291 0 : return err
292 0 : }
293 : }
294 : }
295 1 : if !w.disableKeyOrderChecks && blockWriter.KeyCount() > 0 {
296 1 : // Check that spans are being added in fragmented order. If the two
297 1 : // tombstones overlap, their start and end keys must be identical.
298 1 : prevStart, prevEnd, prevTrailer := blockWriter.UnsafeLastSpan()
299 1 : if w.opts.Comparer.Equal(prevStart, span.Start) && w.opts.Comparer.Equal(prevEnd, span.End) {
300 0 : if prevTrailer < span.Keys[0].Trailer {
301 0 : w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
302 0 : w.opts.Comparer.FormatKey(prevStart),
303 0 : w.opts.Comparer.FormatKey(prevEnd),
304 0 : prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
305 0 : }
306 1 : } else if c := w.opts.Comparer.Compare(prevEnd, span.Start); c > 0 {
307 0 : w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
308 0 : w.opts.Comparer.FormatKey(prevStart),
309 0 : w.opts.Comparer.FormatKey(prevEnd),
310 0 : prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
311 0 : return w.err
312 0 : }
313 : }
314 1 : blockWriter.AddSpan(span)
315 1 : return nil
316 : }
317 :
318 : // AddWithForceObsolete adds a point key/value pair when writing a
319 : // strict-obsolete sstable. For a given Writer, the keys passed to Add must be
320 : // in increasing order. Span keys (range deletions, range keys) must be added
321 : // through EncodeSpan.
322 : //
323 : // forceObsolete indicates whether the caller has determined that this key is
324 : // obsolete even though it may be the latest point key for this userkey. This
325 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
326 : // strict-obsolete sstables.
327 : //
328 : // Note that there are two properties, S1 and S2 (see comment in format.go)
329 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
330 : // responsibility of the caller. S1 is solely the responsibility of the
331 : // callee.
332 : func (w *RawColumnWriter) AddWithForceObsolete(
333 : key InternalKey, value []byte, forceObsolete bool,
334 1 : ) error {
335 1 : switch key.Kind() {
336 : case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
337 0 : base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
338 0 : return errors.Newf("%s must be added through EncodeSpan", key.Kind())
339 1 : case base.InternalKeyKindMerge:
340 1 : if w.opts.IsStrictObsolete {
341 0 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
342 0 : }
343 : }
344 :
345 1 : eval, err := w.evaluatePoint(key, len(value))
346 1 : if err != nil {
347 0 : return err
348 0 : }
349 1 : eval.isObsolete = eval.isObsolete || forceObsolete
350 1 : w.prevPointKey.trailer = key.Trailer
351 1 : w.prevPointKey.isObsolete = eval.isObsolete
352 1 :
353 1 : var valuePrefix block.ValuePrefix
354 1 : var valueStoredWithKey []byte
355 1 : if eval.writeToValueBlock {
356 1 : vh, err := w.valueBlock.AddValue(value)
357 1 : if err != nil {
358 0 : return err
359 0 : }
360 1 : n := valblk.EncodeHandle(w.tmp[:], vh)
361 1 : valueStoredWithKey = w.tmp[:n]
362 1 : var attribute base.ShortAttribute
363 1 : if w.opts.ShortAttributeExtractor != nil {
364 0 : // TODO(sumeer): for compactions, it is possible that the input sstable
365 0 : // already has this value in the value section and so we have already
366 0 : // extracted the ShortAttribute. Avoid extracting it again. This will
367 0 : // require changing the RawWriter.Add interface.
368 0 : if attribute, err = w.opts.ShortAttributeExtractor(
369 0 : key.UserKey, int(eval.kcmp.PrefixLen), value); err != nil {
370 0 : return err
371 0 : }
372 : }
373 1 : valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
374 1 : } else {
375 1 : valueStoredWithKey = value
376 1 : if len(value) > 0 {
377 1 : valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
378 1 : }
379 : }
380 :
381 : // Append the key to the data block. We have NOT yet committed to
382 : // including the key in the block. The data block writer permits us to
383 : // finish the block excluding the last-appended KV.
384 1 : entriesWithoutKV := w.dataBlock.Rows()
385 1 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
386 1 :
387 1 : // Now that we've appended the KV pair, we can compute the exact size of the
388 1 : // block with this key-value pair included. Check to see if we should flush
389 1 : // the current block, either with or without the added key-value pair.
390 1 : size := w.dataBlock.Size()
391 1 : if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize, entriesWithoutKV, &w.dataFlush) {
392 1 : // Flush the data block excluding the key we just added.
393 1 : w.flushDataBlockWithoutNextKey(key.UserKey)
394 1 : // flushDataBlockWithoutNextKey reset the data block builder, and we can
395 1 : // add the key to this next block now.
396 1 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
397 1 : w.pendingDataBlockSize = w.dataBlock.Size()
398 1 : } else {
399 1 : // We're not flushing the data block, and we're committing to including
400 1 : // the current KV in the block. Remember the new size of the data block
401 1 : // with the current KV.
402 1 : w.pendingDataBlockSize = size
403 1 : }
404 :
405 1 : for i := range w.blockPropCollectors {
406 1 : v := value
407 1 : if key.Kind() == base.InternalKeyKindSet {
408 1 : // Values for SET are not required to be in-place, and in the future
409 1 : // may not even be read by the compaction, so pass nil values. Block
410 1 : // property collectors in such Pebble DB's must not look at the
411 1 : // value.
412 1 : v = nil
413 1 : }
414 1 : if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
415 0 : w.err = err
416 0 : return err
417 0 : }
418 : }
419 1 : w.obsoleteCollector.AddPoint(eval.isObsolete)
420 1 : if w.filterBlock != nil {
421 1 : w.filterBlock.addKey(key.UserKey[:eval.kcmp.PrefixLen])
422 1 : }
423 1 : w.meta.updateSeqNum(key.SeqNum())
424 1 : if !w.meta.HasPointKeys {
425 1 : w.meta.SetSmallestPointKey(key.Clone())
426 1 : }
427 :
428 1 : w.props.NumEntries++
429 1 : switch key.Kind() {
430 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
431 1 : w.props.NumDeletions++
432 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
433 1 : w.dataBlock.numDeletions++
434 1 : w.dataBlock.deletionSize += len(key.UserKey)
435 1 : case InternalKeyKindDeleteSized:
436 1 : var size uint64
437 1 : if len(value) > 0 {
438 1 : var n int
439 1 : size, n = binary.Uvarint(value)
440 1 : if n <= 0 {
441 0 : return errors.Newf("%s key's value (%x) does not parse as uvarint",
442 0 : errors.Safe(key.Kind().String()), value)
443 0 : }
444 : }
445 1 : w.props.NumDeletions++
446 1 : w.props.NumSizedDeletions++
447 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
448 1 : w.props.RawPointTombstoneValueSize += size
449 1 : w.dataBlock.numDeletions++
450 1 : w.dataBlock.deletionSize += len(key.UserKey)
451 1 : case InternalKeyKindMerge:
452 1 : w.props.NumMergeOperands++
453 : }
454 1 : w.props.RawKeySize += uint64(key.Size())
455 1 : w.props.RawValueSize += uint64(len(value))
456 1 : return nil
457 : }
458 :
459 : type pointKeyEvaluation struct {
460 : kcmp colblk.KeyComparison
461 : isObsolete bool
462 : writeToValueBlock bool
463 : }
464 :
465 : // evaluatePoint takes information about a point key being written to the
466 : // sstable and decides how the point should be represented, where its value
467 : // should be stored, etc.
468 : func (w *RawColumnWriter) evaluatePoint(
469 : key base.InternalKey, valueLen int,
470 1 : ) (eval pointKeyEvaluation, err error) {
471 1 : eval.kcmp = w.dataBlock.KeyWriter.ComparePrev(key.UserKey)
472 1 :
473 1 : // When invariants are enabled, validate kcmp.
474 1 : if invariants.Enabled {
475 1 : colblk.AssertKeyCompare(w.comparer, key.UserKey, w.previousUserKey.Get(), eval.kcmp)
476 1 : w.previousUserKey.Set(append(w.previousUserKey.Get()[:0], key.UserKey...))
477 1 : }
478 :
479 1 : if !w.meta.HasPointKeys {
480 1 : return eval, nil
481 1 : }
482 1 : keyKind := key.Kind()
483 1 : // Ensure that no one adds a point key kind without considering the obsolete
484 1 : // handling for that kind.
485 1 : switch keyKind {
486 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
487 1 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
488 0 : default:
489 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
490 : }
491 1 : prevKeyKind := w.prevPointKey.trailer.Kind()
492 1 : // If same user key, then the current key is obsolete if any of the
493 1 : // following is true:
494 1 : // C1 The prev key was obsolete.
495 1 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
496 1 : // preserve SET* and MERGE since their values will be merged into the
497 1 : // previous key. We also must preserve DEL* since there may be an older
498 1 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
499 1 : // if we omit the DEL* that lower SET*/MERGE will become visible.
500 1 : //
501 1 : // Regardless of whether it is the same user key or not
502 1 : // C3 The current key is some kind of point delete, and we are writing to
503 1 : // the lowest level, then it is also obsolete. The correctness of this
504 1 : // relies on the same user key not spanning multiple sstables in a level.
505 1 : //
506 1 : // C1 ensures that for a user key there is at most one transition from
507 1 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
508 1 : // are not obsolete. We consider the various value of n:
509 1 : //
510 1 : // n = 0: This happens due to forceObsolete being set by the caller, or due
511 1 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
512 1 : // must also delete all the lower seqnums for the same user key. C3 triggers
513 1 : // due to a point delete and that deletes all the lower seqnums for the same
514 1 : // user key.
515 1 : //
516 1 : // n = 1: This is the common case. It happens when the first key is not a
517 1 : // MERGE, or the current key is some kind of point delete.
518 1 : //
519 1 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
520 1 : // single non-MERGE key.
521 1 : isObsoleteC1AndC2 := eval.kcmp.UserKeyComparison == 0 &&
522 1 : (w.prevPointKey.isObsolete || prevKeyKind != InternalKeyKindMerge)
523 1 : isObsoleteC3 := w.opts.WritingToLowestLevel &&
524 1 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
525 1 : keyKind == InternalKeyKindDeleteSized)
526 1 : eval.isObsolete = isObsoleteC1AndC2 || isObsoleteC3
527 1 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
528 1 : // possible, but requires some care in documenting and checking invariants.
529 1 : // There is code that assumes nothing in value blocks because of single MVCC
530 1 : // version (those should be ok). We have to ensure setHasSamePrefix is
531 1 : // correctly initialized here etc.
532 1 :
533 1 : if !w.disableKeyOrderChecks && (eval.kcmp.UserKeyComparison < 0 ||
534 1 : (eval.kcmp.UserKeyComparison == 0 && w.prevPointKey.trailer <= key.Trailer)) {
535 0 : previousKey := base.InternalKey{
536 0 : UserKey: w.dataBlock.MaterializeLastUserKey(nil),
537 0 : Trailer: w.prevPointKey.trailer,
538 0 : }
539 0 : return eval, errors.Errorf(
540 0 : "pebble: keys must be added in strictly increasing order: %s, %s",
541 0 : previousKey.Pretty(w.comparer.FormatKey),
542 0 : key.Pretty(w.comparer.FormatKey))
543 0 : }
544 :
545 : // We might want to write this key's value to a value block if it has the
546 : // same prefix.
547 : //
548 : // We require:
549 : // . Value blocks to be enabled.
550 : // . The current key to have the same prefix as the previous key.
551 : // . The previous key to be a SET.
552 : // . The current key to be a SET.
553 : // . If there are bounds requiring some keys' values to be in-place, the
554 : // key must not fall within those bounds.
555 : // . The value to be sufficiently large. (Currently we simply require a
556 : // non-zero length, so all non-empty values are eligible for storage
557 : // out-of-band in a value block.)
558 : //
559 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
560 : // valueHandle, this should be > 3. But tiny values are common in test and
561 : // unlikely in production, so we use 0 here for better test coverage.
562 1 : const tinyValueThreshold = 0
563 1 : useValueBlock := !w.opts.DisableValueBlocks &&
564 1 : eval.kcmp.PrefixEqual() &&
565 1 : prevKeyKind == InternalKeyKindSet &&
566 1 : keyKind == InternalKeyKindSet &&
567 1 : valueLen > tinyValueThreshold &&
568 1 : w.valueBlock != nil
569 1 : if !useValueBlock {
570 1 : return eval, nil
571 1 : }
572 : // NB: it is possible that eval.kcmp.UserKeyComparison == 0, i.e., these two
573 : // SETs have identical user keys (because of an open snapshot). This should
574 : // be the rare case.
575 :
576 : // If there are bounds requiring some keys' values to be in-place, compare
577 : // the prefix against the bounds.
578 1 : if !w.opts.RequiredInPlaceValueBound.IsEmpty() {
579 0 : if w.comparer.Compare(w.opts.RequiredInPlaceValueBound.Upper, key.UserKey[:eval.kcmp.PrefixLen]) <= 0 {
580 0 : // Common case for CockroachDB. Make it empty since all future keys
581 0 : // will be >= this key.
582 0 : w.opts.RequiredInPlaceValueBound = UserKeyPrefixBound{}
583 0 : } else if w.comparer.Compare(key.UserKey[:eval.kcmp.PrefixLen], w.opts.RequiredInPlaceValueBound.Lower) >= 0 {
584 0 : // Don't write to value block if the key is within the bounds.
585 0 : return eval, nil
586 0 : }
587 : }
588 1 : eval.writeToValueBlock = true
589 1 : return eval, nil
590 : }
591 :
592 : var compressedBlockPool = sync.Pool{
593 1 : New: func() interface{} {
594 1 : return new(compressedBlock)
595 1 : },
596 : }
597 :
598 : type compressedBlock struct {
599 : physical block.PhysicalBlock
600 : blockBuf blockBuf
601 : }
602 :
603 1 : func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) {
604 1 : serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
605 1 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
606 1 : // Compute the separator that will be written to the index block alongside
607 1 : // this data block's end offset. It is the separator between the last key in
608 1 : // the finished block and the [nextKey] that was excluded from the block.
609 1 : w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
610 1 : w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf)
611 1 : w.dataBlock.Reset()
612 1 : w.pendingDataBlockSize = 0
613 1 : }
614 :
615 : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
616 : // blocks if the number of deletions in the data block exceeds a threshold or
617 : // the deletion size exceeds a threshold. It should be called after the
618 : // data block has been finished.
619 : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
620 1 : func (w *RawColumnWriter) maybeIncrementTombstoneDenseBlocks(uncompressedLen int) {
621 1 : minSize := w.opts.DeletionSizeRatioThreshold * float32(uncompressedLen)
622 1 : if w.dataBlock.numDeletions > w.opts.NumDeletionsThreshold || float32(w.dataBlock.deletionSize) > minSize {
623 0 : w.props.NumTombstoneDenseBlocks++
624 0 : }
625 1 : w.dataBlock.numDeletions = 0
626 1 : w.dataBlock.deletionSize = 0
627 : }
628 :
629 : // enqueueDataBlock compresses and checksums the provided data block and sends
630 : // it to the write queue to be asynchronously written to the underlying storage.
631 : // It also adds the block's index block separator to the pending index block,
632 : // possibly triggering the index block to be finished and buffered.
633 : func (w *RawColumnWriter) enqueueDataBlock(
634 : serializedBlock []byte, lastKey base.InternalKey, separator []byte,
635 1 : ) error {
636 1 : // TODO(jackson): Avoid allocating the largest point user key every time we
637 1 : // set the largest point key. This is what the rowblk writer does too, but
638 1 : // it's unnecessary.
639 1 : w.meta.SetLargestPointKey(lastKey.Clone())
640 1 :
641 1 : if invariants.Enabled {
642 1 : var dec colblk.DataBlockDecoder
643 1 : dec.Init(w.opts.KeySchema, serializedBlock)
644 1 : if err := dec.Validate(w.comparer, w.opts.KeySchema); err != nil {
645 0 : panic(err)
646 : }
647 : }
648 :
649 : // Serialize the data block, compress it and send it to the write queue.
650 1 : cb := compressedBlockPool.Get().(*compressedBlock)
651 1 : cb.blockBuf.checksummer.Type = w.opts.Checksum
652 1 : cb.physical = block.CompressAndChecksum(
653 1 : &cb.blockBuf.compressedBuf,
654 1 : serializedBlock,
655 1 : w.opts.Compression,
656 1 : &cb.blockBuf.checksummer,
657 1 : )
658 1 : if !cb.physical.IsCompressed() {
659 1 : // If the block isn't compressed, cb.physical's underlying data points
660 1 : // directly into a buffer owned by w.dataBlock. Clone it before passing
661 1 : // it to the write queue to be asynchronously written to disk.
662 1 : // TODO(jackson): Should we try to avoid this clone by tracking the
663 1 : // lifetime of the DataBlockWriters?
664 1 : cb.physical = cb.physical.Clone()
665 1 : }
666 1 : return w.enqueuePhysicalBlock(cb, separator)
667 : }
668 :
669 1 : func (w *RawColumnWriter) enqueuePhysicalBlock(cb *compressedBlock, separator []byte) error {
670 1 : dataBlockHandle := block.Handle{
671 1 : Offset: w.queuedDataSize,
672 1 : Length: uint64(cb.physical.LengthWithoutTrailer()),
673 1 : }
674 1 : w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
675 1 : w.writeQueue.ch <- cb
676 1 :
677 1 : var err error
678 1 : w.blockPropsEncoder.resetProps()
679 1 : for i := range w.blockPropCollectors {
680 1 : scratch := w.blockPropsEncoder.getScratchForProp()
681 1 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
682 0 : return err
683 0 : }
684 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
685 : }
686 1 : dataBlockProps := w.blockPropsEncoder.unsafeProps()
687 1 :
688 1 : // Add the separator to the index block. This might trigger a flush of the
689 1 : // index block too.
690 1 : i := w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
691 1 : sizeWithEntry := w.indexBlock.Size()
692 1 : if shouldFlushWithoutLatestKV(sizeWithEntry, w.indexBlockSize, i, &w.indexFlush) {
693 1 : // NB: finishIndexBlock will use blockPropsEncoder, so we must clone the
694 1 : // data block's props first.
695 1 : dataBlockProps = slices.Clone(dataBlockProps)
696 1 :
697 1 : if err = w.finishIndexBlock(w.indexBlock.Rows() - 1); err != nil {
698 0 : return err
699 0 : }
700 : // finishIndexBlock reset the index block builder, and we can
701 : // add the block handle to this new index block.
702 1 : _ = w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
703 1 : w.indexBlockSize = w.indexBlock.Size()
704 1 : } else {
705 1 : w.indexBlockSize = sizeWithEntry
706 1 : }
707 : // Incorporate the finished data block's property into the index block, now
708 : // that we've flushed the index block without the new separator if
709 : // necessary.
710 1 : for i := range w.blockPropCollectors {
711 1 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
712 1 : }
713 1 : return nil
714 : }
715 :
716 : // finishIndexBlock finishes the currently pending index block with the first
717 : // [rows] rows. In practice, [rows] is always w.indexBlock.Rows() or
718 : // w.indexBlock.Rows()-1.
719 : //
720 : // The finished index block is buffered until the writer is closed.
721 1 : func (w *RawColumnWriter) finishIndexBlock(rows int) error {
722 1 : defer w.indexBlock.Reset()
723 1 : w.blockPropsEncoder.resetProps()
724 1 : for i := range w.blockPropCollectors {
725 1 : scratch := w.blockPropsEncoder.getScratchForProp()
726 1 : var err error
727 1 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
728 0 : return err
729 0 : }
730 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
731 : }
732 1 : indexProps := w.blockPropsEncoder.props()
733 1 : bib := bufferedIndexBlock{nEntries: rows, properties: indexProps}
734 1 :
735 1 : // Copy the last (greatest) separator key in the index block into bib.sep.
736 1 : // It'll be the separator on the entry in the top-level index block.
737 1 : //
738 1 : // TODO(jackson): bib.sep.Trailer is unused within the columnar-block
739 1 : // sstable writer. Its existence is a code artifact of reuse of the
740 1 : // bufferedIndexBlock type between colblk and rowblk writers. This can be
741 1 : // cleaned up.
742 1 : bib.sep.Trailer = base.MakeTrailer(base.SeqNumMax, base.InternalKeyKindSeparator)
743 1 : w.indexBuffering.sepAlloc, bib.sep.UserKey = w.indexBuffering.sepAlloc.Copy(
744 1 : w.indexBlock.UnsafeSeparator(rows - 1))
745 1 :
746 1 : // Finish the index block and copy it so that w.indexBlock may be reused.
747 1 : block := w.indexBlock.Finish(rows)
748 1 : if len(w.indexBuffering.blockAlloc) < len(block) {
749 1 : // Allocate enough bytes for approximately 16 index blocks.
750 1 : w.indexBuffering.blockAlloc = make([]byte, len(block)*16)
751 1 : }
752 1 : n := copy(w.indexBuffering.blockAlloc, block)
753 1 : bib.block = w.indexBuffering.blockAlloc[:n:n]
754 1 : w.indexBuffering.blockAlloc = w.indexBuffering.blockAlloc[n:]
755 1 :
756 1 : w.indexBuffering.partitions = append(w.indexBuffering.partitions, bib)
757 1 : return nil
758 : }
759 :
760 : // flushBufferedIndexBlocks writes all index blocks, including the top-level
761 : // index block if necessary, to the underlying writable. It returns the block
762 : // handle of the top index (either the only index block or the top-level index
763 : // if two-level).
764 1 : func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, err error) {
765 1 : // If there's a currently-pending index block, finish it.
766 1 : if w.indexBlock.Rows() > 0 || len(w.indexBuffering.partitions) == 0 {
767 1 : w.finishIndexBlock(w.indexBlock.Rows())
768 1 : }
769 : // We've buffered all the index blocks. Typically there's just one index
770 : // block, in which case we're writing a "single-level" index. If we're
771 : // writing a large file or the index separators happen to be excessively
772 : // long, we may have several index blocks and need to construct a
773 : // "two-level" index structure.
774 1 : switch len(w.indexBuffering.partitions) {
775 0 : case 0:
776 0 : // This is impossible because we'll flush the index block immediately
777 0 : // above this switch statement if there are no buffered partitions
778 0 : // (regardless of whether there are data block handles in the index
779 0 : // block).
780 0 : panic("unreachable")
781 1 : case 1:
782 1 : // Single-level index.
783 1 : rootIndex, err = w.layout.WriteIndexBlock(w.indexBuffering.partitions[0].block)
784 1 : if err != nil {
785 0 : return rootIndex, err
786 0 : }
787 1 : w.props.IndexSize = rootIndex.Length + block.TrailerLen
788 1 : w.props.NumDataBlocks = uint64(w.indexBuffering.partitions[0].nEntries)
789 1 : w.props.IndexType = binarySearchIndex
790 1 : default:
791 1 : // Two-level index.
792 1 : for _, part := range w.indexBuffering.partitions {
793 1 : bh, err := w.layout.WriteIndexBlock(part.block)
794 1 : if err != nil {
795 0 : return block.Handle{}, err
796 0 : }
797 1 : w.props.IndexSize += bh.Length + block.TrailerLen
798 1 : w.props.NumDataBlocks += uint64(w.indexBuffering.partitions[0].nEntries)
799 1 : w.topLevelIndexBlock.AddBlockHandle(part.sep.UserKey, bh, part.properties)
800 : }
801 1 : rootIndex, err = w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish(w.topLevelIndexBlock.Rows()))
802 1 : if err != nil {
803 0 : return block.Handle{}, err
804 0 : }
805 1 : w.props.IndexSize += rootIndex.Length + block.TrailerLen
806 1 : w.props.IndexType = twoLevelIndex
807 1 : w.props.IndexPartitions = uint64(len(w.indexBuffering.partitions))
808 : }
809 1 : return rootIndex, nil
810 : }
811 :
812 : // drainWriteQueue runs in its own goroutine and is responsible for writing
813 : // finished, compressed data blocks to the writable. It reads from w.writeQueue
814 : // until the channel is closed. All data blocks are written by this goroutine.
815 : // Other blocks are written directly by the client goroutine. See Close.
816 1 : func (w *RawColumnWriter) drainWriteQueue() {
817 1 : defer w.writeQueue.wg.Done()
818 1 : for cb := range w.writeQueue.ch {
819 1 : if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
820 0 : w.writeQueue.err = err
821 0 : }
822 1 : cb.blockBuf.clear()
823 1 : cb.physical = block.PhysicalBlock{}
824 1 : compressedBlockPool.Put(cb)
825 : }
826 : }
827 :
828 1 : func (w *RawColumnWriter) Close() (err error) {
829 1 : defer func() {
830 1 : if w.valueBlock != nil {
831 0 : w.valueBlock.Release()
832 0 : // Defensive code in case Close gets called again. We don't want to put
833 0 : // the same object to a sync.Pool.
834 0 : w.valueBlock = nil
835 0 : }
836 1 : w.layout.Abort()
837 1 : // Record any error in the writer (so we can exit early if Close is called
838 1 : // again).
839 1 : if err != nil {
840 0 : w.err = err
841 0 : }
842 : }()
843 1 : if w.layout.writable == nil {
844 0 : return w.err
845 0 : }
846 :
847 : // Finish the last data block and send it to the write queue if it contains
848 : // any pending KVs.
849 1 : if rows := w.dataBlock.Rows(); rows > 0 {
850 1 : serializedBlock, lastKey := w.dataBlock.Finish(rows, w.pendingDataBlockSize)
851 1 : w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], lastKey.UserKey)
852 1 : w.err = errors.CombineErrors(w.err, w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf))
853 1 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
854 1 : }
855 : // Close the write queue channel so that the goroutine responsible for
856 : // writing data blocks to disk knows to exit. Any subsequent blocks (eg,
857 : // index, metadata, range key, etc) will be written by the goroutine that
858 : // called Close.
859 1 : close(w.writeQueue.ch)
860 1 : w.writeQueue.wg.Wait()
861 1 : // If the write queue encountered any errors while writing out data blocks,
862 1 : // it's stored in w.writeQueue.err.
863 1 : w.err = firstError(w.err, w.writeQueue.err)
864 1 : if w.err != nil {
865 0 : return w.err
866 0 : }
867 :
868 : // INVARIANT: w.queuedDataSize == w.layout.offset.
869 : // All data blocks have been written to disk. The queuedDataSize is the
870 : // cumulative size of all the data blocks we've sent to the write queue. Now
871 : // that they've all been flushed, queuedDataSize should match w.layout's
872 : // offset.
873 1 : if w.queuedDataSize != w.layout.offset {
874 0 : panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
875 0 : w.queuedDataSize, w.layout.offset))
876 : }
877 1 : w.props.DataSize = w.layout.offset
878 1 : if _, err = w.flushBufferedIndexBlocks(); err != nil {
879 0 : return err
880 0 : }
881 :
882 : // Write the filter block.
883 1 : if w.filterBlock != nil {
884 1 : bh, err := w.layout.WriteFilterBlock(w.filterBlock)
885 1 : if err != nil {
886 0 : return err
887 0 : }
888 1 : w.props.FilterPolicyName = w.filterBlock.policyName()
889 1 : w.props.FilterSize = bh.Length
890 : }
891 :
892 : // Write the range deletion block if non-empty.
893 1 : if w.rangeDelBlock.KeyCount() > 0 {
894 1 : w.props.NumRangeDeletions = uint64(w.rangeDelBlock.KeyCount())
895 1 : sm, la := w.rangeDelBlock.UnsafeBoundaryKeys()
896 1 : w.meta.SetSmallestRangeDelKey(sm)
897 1 : w.meta.SetLargestRangeDelKey(la)
898 1 : if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
899 0 : return err
900 0 : }
901 : }
902 :
903 : // Write the range key block if non-empty.
904 1 : if w.rangeKeyBlock.KeyCount() > 0 {
905 1 : sm, la := w.rangeKeyBlock.UnsafeBoundaryKeys()
906 1 : w.meta.SetSmallestRangeKey(sm)
907 1 : w.meta.SetLargestRangeKey(la)
908 1 : if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
909 0 : return err
910 0 : }
911 : }
912 :
913 : // Write out the value block.
914 1 : if w.valueBlock != nil {
915 1 : _, vbStats, err := w.valueBlock.Finish(&w.layout, w.layout.offset)
916 1 : if err != nil {
917 0 : return err
918 0 : }
919 1 : w.props.NumValueBlocks = vbStats.NumValueBlocks
920 1 : w.props.NumValuesInValueBlocks = vbStats.NumValuesInValueBlocks
921 1 : w.props.ValueBlocksSize = vbStats.ValueBlocksAndIndexSize
922 : }
923 :
924 : // Write the properties block.
925 1 : {
926 1 : // Finish and record the prop collectors if props are not yet recorded.
927 1 : // Pre-computed props might have been copied by specialized sst creators
928 1 : // like suffix replacer.
929 1 : if len(w.props.UserProperties) == 0 {
930 1 : userProps := make(map[string]string)
931 1 : for i := range w.blockPropCollectors {
932 1 : scratch := w.blockPropsEncoder.getScratchForProp()
933 1 : // Place the shortID in the first byte.
934 1 : scratch = append(scratch, byte(i))
935 1 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
936 1 : if err != nil {
937 0 : return err
938 0 : }
939 1 : var prop string
940 1 : if len(buf) > 0 {
941 1 : prop = string(buf)
942 1 : }
943 : // NB: The property is populated in the map even if it is the
944 : // empty string, since the presence in the map is what indicates
945 : // that the block property collector was used when writing.
946 1 : userProps[w.blockPropCollectors[i].Name()] = prop
947 : }
948 1 : if len(userProps) > 0 {
949 1 : w.props.UserProperties = userProps
950 1 : }
951 : }
952 :
953 1 : var raw rowblk.Writer
954 1 : // The restart interval is set to infinity because the properties block
955 1 : // is always read sequentially and cached in a heap located object. This
956 1 : // reduces table size without a significant impact on performance.
957 1 : raw.RestartInterval = propertiesBlockRestartInterval
958 1 : w.props.CompressionOptions = rocksDBCompressionOptions
959 1 : w.props.save(w.opts.TableFormat, &raw)
960 1 : if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
961 0 : return err
962 0 : }
963 : }
964 :
965 : // Write the table footer.
966 1 : w.meta.Size, err = w.layout.Finish()
967 1 : if err != nil {
968 0 : return err
969 0 : }
970 1 : w.meta.Properties = w.props
971 1 : // Release any held memory and make any future calls error.
972 1 : // TODO(jackson): Ensure other calls error appropriately if the writer is
973 1 : // cleared.
974 1 : *w = RawColumnWriter{meta: w.meta}
975 1 : return nil
976 : }
977 :
978 : // rewriteSuffixes implements RawWriter.
979 : func (w *RawColumnWriter) rewriteSuffixes(
980 : r *Reader, wo WriterOptions, from, to []byte, concurrency int,
981 0 : ) error {
982 0 : for _, c := range w.blockPropCollectors {
983 0 : if !c.SupportsSuffixReplacement() {
984 0 : return errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
985 0 : }
986 : }
987 0 : l, err := r.Layout()
988 0 : if err != nil {
989 0 : return errors.Wrap(err, "reading layout")
990 0 : }
991 : // Copy data blocks in parallel, rewriting suffixes as we go.
992 0 : blocks, err := rewriteDataBlocksInParallel(r, wo, l.Data, from, to, concurrency, func() blockRewriter {
993 0 : return colblk.NewDataBlockRewriter(wo.KeySchema, w.comparer.Compare, w.comparer.Split)
994 0 : })
995 0 : if err != nil {
996 0 : return errors.Wrap(err, "rewriting data blocks")
997 0 : }
998 :
999 : // oldShortIDs maps the shortID for the block property collector in the old
1000 : // blocks to the shortID in the new blocks. Initialized once for the sstable.
1001 0 : oldShortIDs, n, err := getShortIDs(r, w.blockPropCollectors)
1002 0 : if err != nil {
1003 0 : return errors.Wrap(err, "getting short IDs")
1004 0 : }
1005 0 : oldProps := make([][]byte, len(w.blockPropCollectors))
1006 0 : for i := range blocks {
1007 0 : cb := compressedBlockPool.Get().(*compressedBlock)
1008 0 : cb.physical = blocks[i].physical
1009 0 :
1010 0 : // Load any previous values for our prop collectors into oldProps.
1011 0 : for i := range oldProps {
1012 0 : oldProps[i] = nil
1013 0 : }
1014 0 : decoder := makeBlockPropertiesDecoder(n, l.Data[i].Props)
1015 0 : for !decoder.Done() {
1016 0 : id, val, err := decoder.Next()
1017 0 : if err != nil {
1018 0 : return err
1019 0 : }
1020 0 : if oldShortIDs[id].IsValid() {
1021 0 : oldProps[oldShortIDs[id]] = val
1022 0 : }
1023 : }
1024 0 : for i, p := range w.blockPropCollectors {
1025 0 : if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil {
1026 0 : return err
1027 0 : }
1028 : }
1029 0 : var separator []byte
1030 0 : if i+1 < len(blocks) {
1031 0 : w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], blocks[i].end.UserKey, blocks[i+1].start.UserKey)
1032 0 : separator = w.separatorBuf
1033 0 : } else {
1034 0 : w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], blocks[i].end.UserKey)
1035 0 : separator = w.separatorBuf
1036 0 : }
1037 0 : w.enqueuePhysicalBlock(cb, separator)
1038 : }
1039 :
1040 0 : if len(blocks) > 0 {
1041 0 : w.meta.updateSeqNum(blocks[0].start.SeqNum())
1042 0 : w.props.NumEntries = r.Properties.NumEntries
1043 0 : w.props.RawKeySize = r.Properties.RawKeySize
1044 0 : w.props.RawValueSize = r.Properties.RawValueSize
1045 0 : w.meta.SetSmallestPointKey(blocks[0].start)
1046 0 : w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
1047 0 : }
1048 :
1049 : // Copy range key block, replacing suffixes if it exists.
1050 0 : if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
1051 0 : return errors.Wrap(err, "rewriting range key blocks")
1052 0 : }
1053 : // Copy over the filter block if it exists.
1054 0 : if w.filterBlock != nil {
1055 0 : if filterBlockBH, ok := l.FilterByName(w.filterBlock.metaName()); ok {
1056 0 : filterBlock, _, err := readBlockBuf(r, filterBlockBH, nil)
1057 0 : if err != nil {
1058 0 : return errors.Wrap(err, "reading filter")
1059 0 : }
1060 0 : w.filterBlock = copyFilterWriter{
1061 0 : origPolicyName: w.filterBlock.policyName(),
1062 0 : origMetaName: w.filterBlock.metaName(),
1063 0 : data: filterBlock,
1064 0 : }
1065 : }
1066 : }
1067 0 : return nil
1068 : }
1069 :
1070 : func shouldFlushWithoutLatestKV(
1071 : sizeWithKV int, sizeWithoutKV int, entryCountWithoutKV int, flushGovernor *block.FlushGovernor,
1072 1 : ) bool {
1073 1 : if entryCountWithoutKV == 0 {
1074 1 : return false
1075 1 : }
1076 1 : if sizeWithoutKV < flushGovernor.LowWatermark() {
1077 1 : // Fast path when the block is too small to flush.
1078 1 : return false
1079 1 : }
1080 1 : return flushGovernor.ShouldFlush(sizeWithoutKV, sizeWithKV)
1081 : }
1082 :
1083 : // copyDataBlocks adds a range of blocks to the table as-is. These blocks could be
1084 : // compressed. It's specifically used by the sstable copier that can copy parts
1085 : // of an sstable to a new sstable, using CopySpan().
1086 : func (w *RawColumnWriter) copyDataBlocks(
1087 : ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
1088 1 : ) error {
1089 1 : buf := make([]byte, 0, 256<<10)
1090 1 : readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
1091 1 : if firstBlockIdx > lastBlockIdx {
1092 0 : panic("pebble: readAndFlushBlocks called with invalid block range")
1093 : }
1094 : // We need to flush blocks[firstBlockIdx:lastBlockIdx+1] into the write queue.
1095 : // We do this by issuing one big read from the read handle into the buffer, and
1096 : // then enqueueing the writing of those blocks one-by-one.
1097 : //
1098 : // TODO(bilal): Consider refactoring the write queue to support writing multiple
1099 : // blocks in one request.
1100 1 : lastBH := blocks[lastBlockIdx].bh
1101 1 : blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
1102 1 : if blocksToReadLen > uint64(cap(buf)) {
1103 0 : buf = make([]byte, 0, blocksToReadLen)
1104 0 : }
1105 1 : if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
1106 0 : return err
1107 0 : }
1108 1 : for i := firstBlockIdx; i <= lastBlockIdx; i++ {
1109 1 : offsetDiff := blocks[i].bh.Offset - blocks[firstBlockIdx].bh.Offset
1110 1 : blockBuf := buf[offsetDiff : offsetDiff+blocks[i].bh.Length+block.TrailerLen]
1111 1 : cb := compressedBlockPool.Get().(*compressedBlock)
1112 1 : cb.physical = block.NewPhysicalBlock(blockBuf)
1113 1 : if err := w.enqueuePhysicalBlock(cb, blocks[i].sep); err != nil {
1114 0 : return err
1115 0 : }
1116 : }
1117 1 : return nil
1118 : }
1119 : // Iterate through blocks until we have enough to fill cap(buf). When we have more than
1120 : // one block in blocksToRead and adding the next block would exceed the buffer capacity,
1121 : // we read and flush existing blocks in blocksToRead. This allows us to read as many
1122 : // blocks in one IO request as possible, while still utilizing the write queue in this
1123 : // writer.
1124 1 : lastBlockOffset := uint64(0)
1125 1 : for i := 0; i < len(blocks); {
1126 1 : if blocks[i].bh.Offset < lastBlockOffset {
1127 0 : panic("pebble: copyDataBlocks called with blocks out of order")
1128 : }
1129 1 : start := i
1130 1 : // Note the i++ in the initializing condition; this means we will always flush at least
1131 1 : // one block.
1132 1 : for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(cap(buf)); i++ {
1133 1 : }
1134 : // i points to one index past the last block we want to read.
1135 1 : if err := readAndFlushBlocks(start, i-1); err != nil {
1136 0 : return err
1137 0 : }
1138 : }
1139 1 : return nil
1140 : }
1141 :
1142 : // addDataBlock adds a raw uncompressed data block to the table as-is. It's specifically used
1143 : // by the sstable copier that can copy parts of an sstable to a new sstable,
1144 : // using CopySpan().
1145 1 : func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
1146 1 : // Serialize the data block, compress it and send it to the write queue.
1147 1 : cb := compressedBlockPool.Get().(*compressedBlock)
1148 1 : cb.blockBuf.checksummer.Type = w.opts.Checksum
1149 1 : cb.physical = block.CompressAndChecksum(
1150 1 : &cb.blockBuf.compressedBuf,
1151 1 : b,
1152 1 : w.opts.Compression,
1153 1 : &cb.blockBuf.checksummer,
1154 1 : )
1155 1 : if !cb.physical.IsCompressed() {
1156 1 : // If the block isn't compressed, cb.physical's underlying data points
1157 1 : // directly into a buffer owned by w.dataBlock. Clone it before passing
1158 1 : // it to the write queue to be asynchronously written to disk.
1159 1 : // TODO(jackson): Should we try to avoid this clone by tracking the
1160 1 : // lifetime of the DataBlockWriters?
1161 1 : cb.physical = cb.physical.Clone()
1162 1 : }
1163 1 : if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
1164 0 : return err
1165 0 : }
1166 1 : return nil
1167 : }
1168 :
1169 : // copyFilter copies the specified filter to the table. It's specifically used
1170 : // by the sstable copier that can copy parts of an sstable to a new sstable,
1171 : // using CopySpan().
1172 1 : func (w *RawColumnWriter) copyFilter(filter []byte, filterName string) error {
1173 1 : if w.filterBlock != nil && filterName != w.filterBlock.policyName() {
1174 0 : return errors.New("mismatched filters")
1175 0 : }
1176 1 : w.filterBlock = copyFilterWriter{
1177 1 : origPolicyName: w.filterBlock.policyName(), origMetaName: w.filterBlock.metaName(), data: filter,
1178 1 : }
1179 1 : return nil
1180 : }
1181 :
1182 : // copyProperties copies properties from the specified props, and resets others
1183 : // to prepare for copying data blocks from another sstable, using the copy/addDataBlock(s)
1184 : // methods above. It's specifically used by the sstable copier that can copy parts of an
1185 : // sstable to a new sstable, using CopySpan().
1186 1 : func (w *RawColumnWriter) copyProperties(props Properties) {
1187 1 : w.props = props
1188 1 : // Remove all user properties to disable block properties, which we do not
1189 1 : // calculate for CopySpan.
1190 1 : w.props.UserProperties = nil
1191 1 : // Reset props that we'll re-derive as we build our own index.
1192 1 : w.props.IndexPartitions = 0
1193 1 : w.props.TopLevelIndexSize = 0
1194 1 : w.props.IndexSize = 0
1195 1 : w.props.IndexType = 0
1196 1 : }
|