Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "math"
13 : "slices"
14 : "sync"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/bytealloc"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/internal/keyspan"
21 : "github.com/cockroachdb/pebble/objstorage"
22 : "github.com/cockroachdb/pebble/sstable/block"
23 : "github.com/cockroachdb/pebble/sstable/colblk"
24 : "github.com/cockroachdb/pebble/sstable/rowblk"
25 : )
26 :
27 : // RawColumnWriter is a sstable RawWriter that writes sstables with
28 : // column-oriented blocks. All table formats TableFormatPebblev5 and later write
29 : // column-oriented blocks and use RawColumnWriter.
30 : type RawColumnWriter struct {
31 : comparer *base.Comparer
32 : meta WriterMetadata
33 : opts WriterOptions
34 : err error
35 :
36 : dataFlush block.FlushGovernor
37 : indexFlush block.FlushGovernor
38 : blockPropCollectors []BlockPropertyCollector
39 : blockPropsEncoder blockPropertiesEncoder
40 : obsoleteCollector obsoleteKeyBlockPropertyCollector
41 : props Properties
42 : // block writers buffering unflushed data.
43 : dataBlock struct {
44 : colblk.DataBlockEncoder
45 : // numDeletions stores the count of point tombstones in this data block.
46 : // It's used to determine if this data block is considered
47 : // tombstone-dense for the purposes of compaction.
48 : numDeletions int
49 : // deletionSize stores the raw size of point tombstones in this data
50 : // block. It's used to determine if this data block is considered
51 : // tombstone-dense for the purposes of compaction.
52 : deletionSize int
53 : }
54 : indexBlock colblk.IndexBlockWriter
55 : topLevelIndexBlock colblk.IndexBlockWriter
56 : rangeDelBlock colblk.KeyspanBlockWriter
57 : rangeKeyBlock colblk.KeyspanBlockWriter
58 : valueBlock *valueBlockWriter // nil iff WriterOptions.DisableValueBlocks=true
59 : // filter accumulates the filter block. If populated, the filter ingests
60 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
61 : // nil, or the full keys otherwise.
62 : filterBlock filterWriter
63 : prevPointKey struct {
64 : trailer base.InternalKeyTrailer
65 : isObsolete bool
66 : }
67 : pendingDataBlockSize int
68 : indexBlockSize int
69 : queuedDataSize uint64
70 :
71 : // indexBuffering holds finished index blocks as they're completed while
72 : // building the sstable. If an index block grows sufficiently large
73 : // (IndexBlockSize) while an sstable is still being constructed, the sstable
74 : // writer will create a two-level index structure. As index blocks are
75 : // completed, they're finished and buffered in-memory until the table is
76 : // finished. When the table is finished, the buffered index blocks are
77 : // flushed in order after all the data blocks, and the top-level index block
78 : // is constructed to point to all the individual index blocks.
79 : indexBuffering struct {
80 : // partitions holds all the completed index blocks.
81 : partitions []bufferedIndexBlock
82 : // blockAlloc is used to bulk-allocate byte slices used to store index
83 : // blocks in partitions. These live until the sstable is finished.
84 : blockAlloc []byte
85 : // sepAlloc is used to bulk-allocate index block separator slices stored
86 : // in partitions. These live until the sstable is finished.
87 : sepAlloc bytealloc.A
88 : }
89 :
90 : writeQueue struct {
91 : wg sync.WaitGroup
92 : ch chan *compressedBlock
93 : err error
94 : }
95 : layout layoutWriter
96 :
97 : separatorBuf []byte
98 : tmp [blockHandleLikelyMaxLen]byte
99 : previousUserKey invariants.Value[[]byte]
100 : disableKeyOrderChecks bool
101 : }
102 :
103 : // Assert that *RawColumnWriter implements RawWriter.
104 : var _ RawWriter = (*RawColumnWriter)(nil)
105 :
106 1 : func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
107 1 : if writable == nil {
108 0 : panic("pebble: nil writable")
109 : }
110 1 : if !o.TableFormat.BlockColumnar() {
111 0 : panic(errors.AssertionFailedf("newColumnarWriter cannot create sstables with %s format", o.TableFormat))
112 : }
113 1 : o = o.ensureDefaults()
114 1 : w := &RawColumnWriter{
115 1 : comparer: o.Comparer,
116 1 : meta: WriterMetadata{
117 1 : SmallestSeqNum: math.MaxUint64,
118 1 : },
119 1 : opts: o,
120 1 : layout: makeLayoutWriter(writable, o),
121 1 : disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
122 1 : }
123 1 : w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
124 1 : w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
125 1 : w.dataBlock.Init(o.KeySchema)
126 1 : w.indexBlock.Init()
127 1 : w.topLevelIndexBlock.Init()
128 1 : w.rangeDelBlock.Init(w.comparer.Equal)
129 1 : w.rangeKeyBlock.Init(w.comparer.Equal)
130 1 : if !o.DisableValueBlocks {
131 1 : w.valueBlock = newValueBlockWriter(
132 1 : block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
133 1 : w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
134 : }
135 1 : if o.FilterPolicy != nil {
136 1 : switch o.FilterType {
137 1 : case TableFilter:
138 1 : w.filterBlock = newTableFilterWriter(o.FilterPolicy)
139 0 : default:
140 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
141 : }
142 : }
143 :
144 1 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
145 1 : if !o.disableObsoleteCollector {
146 1 : numBlockPropertyCollectors++
147 1 : }
148 1 : if numBlockPropertyCollectors > maxPropertyCollectors {
149 0 : panic(errors.New("pebble: too many block property collectors"))
150 : }
151 1 : w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
152 1 : for _, constructFn := range o.BlockPropertyCollectors {
153 1 : w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
154 1 : }
155 1 : if !o.disableObsoleteCollector {
156 1 : w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
157 1 : }
158 1 : var buf bytes.Buffer
159 1 : buf.WriteString("[")
160 1 : for i := range w.blockPropCollectors {
161 1 : if i > 0 {
162 1 : buf.WriteString(",")
163 1 : }
164 1 : buf.WriteString(w.blockPropCollectors[i].Name())
165 : }
166 1 : buf.WriteString("]")
167 1 : w.props.PropertyCollectorNames = buf.String()
168 1 :
169 1 : w.props.ComparerName = o.Comparer.Name
170 1 : w.props.CompressionName = o.Compression.String()
171 1 : w.props.KeySchemaName = o.KeySchema.Name
172 1 : w.props.MergerName = o.MergerName
173 1 :
174 1 : w.writeQueue.ch = make(chan *compressedBlock)
175 1 : w.writeQueue.wg.Add(1)
176 1 : go w.drainWriteQueue()
177 1 : return w
178 : }
179 :
180 : // Error returns the current accumulated error if any.
181 1 : func (w *RawColumnWriter) Error() error {
182 1 : return w.err
183 1 : }
184 :
185 : // EstimatedSize returns the estimated size of the sstable being written if
186 : // a call to Close() was made without adding additional keys.
187 1 : func (w *RawColumnWriter) EstimatedSize() uint64 {
188 1 : sz := rocksDBFooterLen + w.queuedDataSize
189 1 : // TODO(jackson): Avoid iterating over partitions by incrementally
190 1 : // maintaining the size contribution of all buffered partitions.
191 1 : for _, bib := range w.indexBuffering.partitions {
192 0 : // We include the separator user key to account for its bytes in the
193 0 : // top-level index block.
194 0 : //
195 0 : // TODO(jackson): We could incrementally build the top-level index block
196 0 : // and produce an exact calculation of the current top-level index
197 0 : // block's size.
198 0 : sz += uint64(len(bib.block) + block.TrailerLen + len(bib.sep.UserKey))
199 0 : }
200 1 : if w.rangeDelBlock.KeyCount() > 0 {
201 1 : sz += uint64(w.rangeDelBlock.Size())
202 1 : }
203 1 : if w.rangeKeyBlock.KeyCount() > 0 {
204 1 : sz += uint64(w.rangeKeyBlock.Size())
205 1 : }
206 1 : for _, blk := range w.valueBlock.blocks {
207 0 : sz += uint64(blk.block.LengthWithTrailer())
208 0 : }
209 1 : if w.valueBlock.buf != nil {
210 1 : sz += uint64(len(w.valueBlock.buf.b))
211 1 : }
212 : // TODO(jackson): Include an estimate of the properties, filter and meta
213 : // index blocks sizes.
214 1 : return sz
215 : }
216 :
217 : // ComparePrev compares the provided user to the last point key written to the
218 : // writer. The returned value is equivalent to Compare(key, prevKey) where
219 : // prevKey is the last point key written to the writer.
220 : //
221 : // If no key has been written yet, ComparePrev returns +1.
222 : //
223 : // Must not be called after Writer is closed.
224 1 : func (w *RawColumnWriter) ComparePrev(k []byte) int {
225 1 : if w == nil || w.dataBlock.Rows() == 0 {
226 1 : return +1
227 1 : }
228 1 : return int(w.dataBlock.KeyWriter.ComparePrev(k).UserKeyComparison)
229 : }
230 :
231 : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
232 : // be used internally by Pebble.
233 : func (w *RawColumnWriter) SetSnapshotPinnedProperties(
234 : pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
235 1 : ) {
236 1 : w.props.SnapshotPinnedKeys = pinnedKeyCount
237 1 : w.props.SnapshotPinnedKeySize = pinnedKeySize
238 1 : w.props.SnapshotPinnedValueSize = pinnedValueSize
239 1 : }
240 :
241 : // Metadata returns the metadata for the finished sstable. Only valid to call
242 : // after the sstable has been finished.
243 1 : func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
244 1 : if !w.layout.IsFinished() {
245 0 : return nil, errors.New("pebble: writer is not closed")
246 0 : }
247 1 : return &w.meta, nil
248 : }
249 :
250 : // EncodeSpan encodes the keys in the given span. The span can contain either
251 : // only RANGEDEL keys or only range keys.
252 1 : func (w *RawColumnWriter) EncodeSpan(span keyspan.Span) error {
253 1 : if span.Empty() {
254 1 : return nil
255 1 : }
256 1 : for _, k := range span.Keys {
257 1 : w.meta.updateSeqNum(k.SeqNum())
258 1 : }
259 :
260 1 : blockWriter := &w.rangeKeyBlock
261 1 : if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
262 1 : blockWriter = &w.rangeDelBlock
263 1 : // Update range delete properties.
264 1 : // NB: These properties are computed differently than the rowblk sstable
265 1 : // writer because this writer does not flatten them into row key-value
266 1 : // pairs.
267 1 : w.props.RawKeySize += uint64(len(span.Start) + len(span.End))
268 1 : count := uint64(len(span.Keys))
269 1 : w.props.NumEntries += count
270 1 : w.props.NumDeletions += count
271 1 : w.props.NumRangeDeletions += count
272 1 : } else {
273 1 : // Update range key properties.
274 1 : // NB: These properties are computed differently than the rowblk sstable
275 1 : // writer because this writer does not flatten them into row key-value
276 1 : // pairs.
277 1 : w.props.RawRangeKeyKeySize += uint64(len(span.Start) + len(span.End))
278 1 : for _, k := range span.Keys {
279 1 : w.props.RawRangeKeyValueSize += uint64(len(k.Value))
280 1 : switch k.Kind() {
281 1 : case base.InternalKeyKindRangeKeyDelete:
282 1 : w.props.NumRangeKeyDels++
283 1 : case base.InternalKeyKindRangeKeySet:
284 1 : w.props.NumRangeKeySets++
285 1 : case base.InternalKeyKindRangeKeyUnset:
286 1 : w.props.NumRangeKeyUnsets++
287 0 : default:
288 0 : panic(errors.Errorf("pebble: invalid range key type: %s", k.Kind()))
289 : }
290 : }
291 1 : for i := range w.blockPropCollectors {
292 1 : if err := w.blockPropCollectors[i].AddRangeKeys(span); err != nil {
293 0 : return err
294 0 : }
295 : }
296 : }
297 1 : if !w.disableKeyOrderChecks && blockWriter.KeyCount() > 0 {
298 1 : // Check that spans are being added in fragmented order. If the two
299 1 : // tombstones overlap, their start and end keys must be identical.
300 1 : prevStart, prevEnd, prevTrailer := blockWriter.UnsafeLastSpan()
301 1 : if w.opts.Comparer.Equal(prevStart, span.Start) && w.opts.Comparer.Equal(prevEnd, span.End) {
302 1 : if prevTrailer < span.Keys[0].Trailer {
303 1 : w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
304 1 : w.opts.Comparer.FormatKey(prevStart),
305 1 : w.opts.Comparer.FormatKey(prevEnd),
306 1 : prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
307 1 : }
308 1 : } else if c := w.opts.Comparer.Compare(prevEnd, span.Start); c > 0 {
309 1 : w.err = errors.Errorf("pebble: keys must be added in order: %s-%s:{(#%s)}, %s",
310 1 : w.opts.Comparer.FormatKey(prevStart),
311 1 : w.opts.Comparer.FormatKey(prevEnd),
312 1 : prevTrailer, span.Pretty(w.opts.Comparer.FormatKey))
313 1 : return w.err
314 1 : }
315 : }
316 1 : blockWriter.AddSpan(span)
317 1 : return nil
318 : }
319 :
320 : // AddWithForceObsolete adds a point key/value pair when writing a
321 : // strict-obsolete sstable. For a given Writer, the keys passed to Add must be
322 : // in increasing order. Span keys (range deletions, range keys) must be added
323 : // through EncodeSpan.
324 : //
325 : // forceObsolete indicates whether the caller has determined that this key is
326 : // obsolete even though it may be the latest point key for this userkey. This
327 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
328 : // strict-obsolete sstables.
329 : //
330 : // Note that there are two properties, S1 and S2 (see comment in format.go)
331 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
332 : // responsibility of the caller. S1 is solely the responsibility of the
333 : // callee.
334 : func (w *RawColumnWriter) AddWithForceObsolete(
335 : key InternalKey, value []byte, forceObsolete bool,
336 1 : ) error {
337 1 : switch key.Kind() {
338 : case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
339 1 : base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
340 1 : return errors.Newf("%s must be added through EncodeSpan", key.Kind())
341 1 : case base.InternalKeyKindMerge:
342 1 : if w.opts.IsStrictObsolete {
343 0 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
344 0 : }
345 : }
346 :
347 1 : eval, err := w.evaluatePoint(key, len(value))
348 1 : if err != nil {
349 1 : return err
350 1 : }
351 1 : eval.isObsolete = eval.isObsolete || forceObsolete
352 1 : w.prevPointKey.trailer = key.Trailer
353 1 : w.prevPointKey.isObsolete = eval.isObsolete
354 1 :
355 1 : var valuePrefix block.ValuePrefix
356 1 : var valueStoredWithKey []byte
357 1 : if eval.writeToValueBlock {
358 1 : vh, err := w.valueBlock.addValue(value)
359 1 : if err != nil {
360 0 : return err
361 0 : }
362 1 : n := encodeValueHandle(w.tmp[:], vh)
363 1 : valueStoredWithKey = w.tmp[:n]
364 1 : var attribute base.ShortAttribute
365 1 : if w.opts.ShortAttributeExtractor != nil {
366 1 : // TODO(sumeer): for compactions, it is possible that the input sstable
367 1 : // already has this value in the value section and so we have already
368 1 : // extracted the ShortAttribute. Avoid extracting it again. This will
369 1 : // require changing the RawWriter.Add interface.
370 1 : if attribute, err = w.opts.ShortAttributeExtractor(
371 1 : key.UserKey, int(eval.kcmp.PrefixLen), value); err != nil {
372 0 : return err
373 0 : }
374 : }
375 1 : valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
376 1 : } else {
377 1 : valueStoredWithKey = value
378 1 : if len(value) > 0 {
379 1 : valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
380 1 : }
381 : }
382 :
383 : // Append the key to the data block. We have NOT yet committed to
384 : // including the key in the block. The data block writer permits us to
385 : // finish the block excluding the last-appended KV.
386 1 : entriesWithoutKV := w.dataBlock.Rows()
387 1 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
388 1 :
389 1 : // Now that we've appended the KV pair, we can compute the exact size of the
390 1 : // block with this key-value pair included. Check to see if we should flush
391 1 : // the current block, either with or without the added key-value pair.
392 1 : size := w.dataBlock.Size()
393 1 : if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize, entriesWithoutKV, &w.dataFlush) {
394 1 : // Flush the data block excluding the key we just added.
395 1 : w.flushDataBlockWithoutNextKey(key.UserKey)
396 1 : // flushDataBlockWithoutNextKey reset the data block builder, and we can
397 1 : // add the key to this next block now.
398 1 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
399 1 : w.pendingDataBlockSize = w.dataBlock.Size()
400 1 : } else {
401 1 : // We're not flushing the data block, and we're committing to including
402 1 : // the current KV in the block. Remember the new size of the data block
403 1 : // with the current KV.
404 1 : w.pendingDataBlockSize = size
405 1 : }
406 :
407 1 : for i := range w.blockPropCollectors {
408 1 : v := value
409 1 : if key.Kind() == base.InternalKeyKindSet {
410 1 : // Values for SET are not required to be in-place, and in the future
411 1 : // may not even be read by the compaction, so pass nil values. Block
412 1 : // property collectors in such Pebble DB's must not look at the
413 1 : // value.
414 1 : v = nil
415 1 : }
416 1 : if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
417 0 : w.err = err
418 0 : return err
419 0 : }
420 : }
421 1 : w.obsoleteCollector.AddPoint(eval.isObsolete)
422 1 : if w.filterBlock != nil {
423 1 : w.filterBlock.addKey(key.UserKey[:eval.kcmp.PrefixLen])
424 1 : }
425 1 : w.meta.updateSeqNum(key.SeqNum())
426 1 : if !w.meta.HasPointKeys {
427 1 : w.meta.SetSmallestPointKey(key.Clone())
428 1 : }
429 :
430 1 : w.props.NumEntries++
431 1 : switch key.Kind() {
432 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
433 1 : w.props.NumDeletions++
434 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
435 1 : w.dataBlock.numDeletions++
436 1 : w.dataBlock.deletionSize += len(key.UserKey)
437 1 : case InternalKeyKindDeleteSized:
438 1 : var size uint64
439 1 : if len(value) > 0 {
440 1 : var n int
441 1 : size, n = binary.Uvarint(value)
442 1 : if n <= 0 {
443 0 : return errors.Newf("%s key's value (%x) does not parse as uvarint",
444 0 : errors.Safe(key.Kind().String()), value)
445 0 : }
446 : }
447 1 : w.props.NumDeletions++
448 1 : w.props.NumSizedDeletions++
449 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
450 1 : w.props.RawPointTombstoneValueSize += size
451 1 : w.dataBlock.numDeletions++
452 1 : w.dataBlock.deletionSize += len(key.UserKey)
453 1 : case InternalKeyKindMerge:
454 1 : w.props.NumMergeOperands++
455 : }
456 1 : w.props.RawKeySize += uint64(key.Size())
457 1 : w.props.RawValueSize += uint64(len(value))
458 1 : return nil
459 : }
460 :
461 : type pointKeyEvaluation struct {
462 : kcmp colblk.KeyComparison
463 : isObsolete bool
464 : writeToValueBlock bool
465 : }
466 :
467 : // evaluatePoint takes information about a point key being written to the
468 : // sstable and decides how the point should be represented, where its value
469 : // should be stored, etc.
470 : func (w *RawColumnWriter) evaluatePoint(
471 : key base.InternalKey, valueLen int,
472 1 : ) (eval pointKeyEvaluation, err error) {
473 1 : eval.kcmp = w.dataBlock.KeyWriter.ComparePrev(key.UserKey)
474 1 :
475 1 : // When invariants are enabled, validate kcmp.
476 1 : if invariants.Enabled {
477 1 : colblk.AssertKeyCompare(w.comparer, key.UserKey, w.previousUserKey.Get(), eval.kcmp)
478 1 : w.previousUserKey.Store(append(w.previousUserKey.Get()[:0], key.UserKey...))
479 1 : }
480 :
481 1 : if !w.meta.HasPointKeys {
482 1 : return eval, nil
483 1 : }
484 1 : keyKind := key.Kind()
485 1 : // Ensure that no one adds a point key kind without considering the obsolete
486 1 : // handling for that kind.
487 1 : switch keyKind {
488 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
489 1 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
490 0 : default:
491 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
492 : }
493 1 : prevKeyKind := w.prevPointKey.trailer.Kind()
494 1 : // If same user key, then the current key is obsolete if any of the
495 1 : // following is true:
496 1 : // C1 The prev key was obsolete.
497 1 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
498 1 : // preserve SET* and MERGE since their values will be merged into the
499 1 : // previous key. We also must preserve DEL* since there may be an older
500 1 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
501 1 : // if we omit the DEL* that lower SET*/MERGE will become visible.
502 1 : //
503 1 : // Regardless of whether it is the same user key or not
504 1 : // C3 The current key is some kind of point delete, and we are writing to
505 1 : // the lowest level, then it is also obsolete. The correctness of this
506 1 : // relies on the same user key not spanning multiple sstables in a level.
507 1 : //
508 1 : // C1 ensures that for a user key there is at most one transition from
509 1 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
510 1 : // are not obsolete. We consider the various value of n:
511 1 : //
512 1 : // n = 0: This happens due to forceObsolete being set by the caller, or due
513 1 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
514 1 : // must also delete all the lower seqnums for the same user key. C3 triggers
515 1 : // due to a point delete and that deletes all the lower seqnums for the same
516 1 : // user key.
517 1 : //
518 1 : // n = 1: This is the common case. It happens when the first key is not a
519 1 : // MERGE, or the current key is some kind of point delete.
520 1 : //
521 1 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
522 1 : // single non-MERGE key.
523 1 : isObsoleteC1AndC2 := eval.kcmp.UserKeyComparison == 0 &&
524 1 : (w.prevPointKey.isObsolete || prevKeyKind != InternalKeyKindMerge)
525 1 : isObsoleteC3 := w.opts.WritingToLowestLevel &&
526 1 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
527 1 : keyKind == InternalKeyKindDeleteSized)
528 1 : eval.isObsolete = isObsoleteC1AndC2 || isObsoleteC3
529 1 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
530 1 : // possible, but requires some care in documenting and checking invariants.
531 1 : // There is code that assumes nothing in value blocks because of single MVCC
532 1 : // version (those should be ok). We have to ensure setHasSamePrefix is
533 1 : // correctly initialized here etc.
534 1 :
535 1 : if !w.disableKeyOrderChecks && (eval.kcmp.UserKeyComparison < 0 ||
536 1 : (eval.kcmp.UserKeyComparison == 0 && w.prevPointKey.trailer <= key.Trailer)) {
537 1 : return eval, errors.Errorf(
538 1 : "pebble: keys must be added in strictly increasing order: %s",
539 1 : key.Pretty(w.comparer.FormatKey))
540 1 : }
541 :
542 : // We might want to write this key's value to a value block if it has the
543 : // same prefix.
544 : //
545 : // We require:
546 : // . Value blocks to be enabled.
547 : // . The current key to have the same prefix as the previous key.
548 : // . The previous key to be a SET.
549 : // . The current key to be a SET.
550 : // . If there are bounds requiring some keys' values to be in-place, the
551 : // key must not fall within those bounds.
552 : // . The value to be sufficiently large. (Currently we simply require a
553 : // non-zero length, so all non-empty values are eligible for storage
554 : // out-of-band in a value block.)
555 : //
556 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
557 : // valueHandle, this should be > 3. But tiny values are common in test and
558 : // unlikely in production, so we use 0 here for better test coverage.
559 1 : const tinyValueThreshold = 0
560 1 : useValueBlock := !w.opts.DisableValueBlocks &&
561 1 : eval.kcmp.PrefixEqual() &&
562 1 : prevKeyKind == InternalKeyKindSet &&
563 1 : keyKind == InternalKeyKindSet &&
564 1 : valueLen > tinyValueThreshold &&
565 1 : w.valueBlock != nil
566 1 : if !useValueBlock {
567 1 : return eval, nil
568 1 : }
569 : // NB: it is possible that eval.kcmp.UserKeyComparison == 0, i.e., these two
570 : // SETs have identical user keys (because of an open snapshot). This should
571 : // be the rare case.
572 :
573 : // If there are bounds requiring some keys' values to be in-place, compare
574 : // the prefix against the bounds.
575 1 : if !w.opts.RequiredInPlaceValueBound.IsEmpty() {
576 1 : if w.comparer.Compare(w.opts.RequiredInPlaceValueBound.Upper, key.UserKey[:eval.kcmp.PrefixLen]) <= 0 {
577 1 : // Common case for CockroachDB. Make it empty since all future keys
578 1 : // will be >= this key.
579 1 : w.opts.RequiredInPlaceValueBound = UserKeyPrefixBound{}
580 1 : } else if w.comparer.Compare(key.UserKey[:eval.kcmp.PrefixLen], w.opts.RequiredInPlaceValueBound.Lower) >= 0 {
581 1 : // Don't write to value block if the key is within the bounds.
582 1 : return eval, nil
583 1 : }
584 : }
585 1 : eval.writeToValueBlock = true
586 1 : return eval, nil
587 : }
588 :
589 : var compressedBlockPool = sync.Pool{
590 1 : New: func() interface{} {
591 1 : return new(compressedBlock)
592 1 : },
593 : }
594 :
595 : type compressedBlock struct {
596 : physical block.PhysicalBlock
597 : blockBuf blockBuf
598 : }
599 :
600 1 : func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) {
601 1 : serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
602 1 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
603 1 : // Compute the separator that will be written to the index block alongside
604 1 : // this data block's end offset. It is the separator between the last key in
605 1 : // the finished block and the [nextKey] that was excluded from the block.
606 1 : w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
607 1 : w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf)
608 1 : w.dataBlock.Reset()
609 1 : w.pendingDataBlockSize = 0
610 1 : }
611 :
612 : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
613 : // blocks if the number of deletions in the data block exceeds a threshold or
614 : // the deletion size exceeds a threshold. It should be called after the
615 : // data block has been finished.
616 : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
617 1 : func (w *RawColumnWriter) maybeIncrementTombstoneDenseBlocks(uncompressedLen int) {
618 1 : minSize := w.opts.DeletionSizeRatioThreshold * float32(uncompressedLen)
619 1 : if w.dataBlock.numDeletions > w.opts.NumDeletionsThreshold || float32(w.dataBlock.deletionSize) > minSize {
620 1 : w.props.NumTombstoneDenseBlocks++
621 1 : }
622 1 : w.dataBlock.numDeletions = 0
623 1 : w.dataBlock.deletionSize = 0
624 : }
625 :
626 : // enqueueDataBlock compresses and checksums the provided data block and sends
627 : // it to the write queue to be asynchronously written to the underlying storage.
628 : // It also adds the block's index block separator to the pending index block,
629 : // possibly triggering the index block to be finished and buffered.
630 : func (w *RawColumnWriter) enqueueDataBlock(
631 : serializedBlock []byte, lastKey base.InternalKey, separator []byte,
632 1 : ) error {
633 1 : // TODO(jackson): Avoid allocating the largest point user key every time we
634 1 : // set the largest point key. This is what the rowblk writer does too, but
635 1 : // it's unnecessary.
636 1 : w.meta.SetLargestPointKey(lastKey.Clone())
637 1 :
638 1 : if invariants.Enabled {
639 1 : var dec colblk.DataBlockDecoder
640 1 : dec.Init(w.opts.KeySchema, serializedBlock)
641 1 : if err := dec.Validate(w.comparer, w.opts.KeySchema); err != nil {
642 0 : panic(err)
643 : }
644 : }
645 :
646 : // Serialize the data block, compress it and send it to the write queue.
647 1 : cb := compressedBlockPool.Get().(*compressedBlock)
648 1 : cb.blockBuf.checksummer.Type = w.opts.Checksum
649 1 : cb.physical = block.CompressAndChecksum(
650 1 : &cb.blockBuf.compressedBuf,
651 1 : serializedBlock,
652 1 : w.opts.Compression,
653 1 : &cb.blockBuf.checksummer,
654 1 : )
655 1 : if !cb.physical.IsCompressed() {
656 1 : // If the block isn't compressed, cb.physical's underlying data points
657 1 : // directly into a buffer owned by w.dataBlock. Clone it before passing
658 1 : // it to the write queue to be asynchronously written to disk.
659 1 : // TODO(jackson): Should we try to avoid this clone by tracking the
660 1 : // lifetime of the DataBlockWriters?
661 1 : cb.physical = cb.physical.Clone()
662 1 : }
663 1 : return w.enqueuePhysicalBlock(cb, separator)
664 : }
665 :
666 1 : func (w *RawColumnWriter) enqueuePhysicalBlock(cb *compressedBlock, separator []byte) error {
667 1 : dataBlockHandle := block.Handle{
668 1 : Offset: w.queuedDataSize,
669 1 : Length: uint64(cb.physical.LengthWithoutTrailer()),
670 1 : }
671 1 : w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
672 1 : w.writeQueue.ch <- cb
673 1 :
674 1 : var err error
675 1 : w.blockPropsEncoder.resetProps()
676 1 : for i := range w.blockPropCollectors {
677 1 : scratch := w.blockPropsEncoder.getScratchForProp()
678 1 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
679 0 : return err
680 0 : }
681 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
682 : }
683 1 : dataBlockProps := w.blockPropsEncoder.unsafeProps()
684 1 :
685 1 : // Add the separator to the index block. This might trigger a flush of the
686 1 : // index block too.
687 1 : i := w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
688 1 : sizeWithEntry := w.indexBlock.Size()
689 1 : if shouldFlushWithoutLatestKV(sizeWithEntry, w.indexBlockSize, i, &w.indexFlush) {
690 1 : // NB: finishIndexBlock will use blockPropsEncoder, so we must clone the
691 1 : // data block's props first.
692 1 : dataBlockProps = slices.Clone(dataBlockProps)
693 1 :
694 1 : if err = w.finishIndexBlock(w.indexBlock.Rows() - 1); err != nil {
695 0 : return err
696 0 : }
697 : // finishIndexBlock reset the index block builder, and we can
698 : // add the block handle to this new index block.
699 1 : _ = w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
700 1 : w.indexBlockSize = w.indexBlock.Size()
701 1 : } else {
702 1 : w.indexBlockSize = sizeWithEntry
703 1 : }
704 : // Incorporate the finished data block's property into the index block, now
705 : // that we've flushed the index block without the new separator if
706 : // necessary.
707 1 : for i := range w.blockPropCollectors {
708 1 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
709 1 : }
710 1 : return nil
711 : }
712 :
713 : // finishIndexBlock finishes the currently pending index block with the first
714 : // [rows] rows. In practice, [rows] is always w.indexBlock.Rows() or
715 : // w.indexBlock.Rows()-1.
716 : //
717 : // The finished index block is buffered until the writer is closed.
718 1 : func (w *RawColumnWriter) finishIndexBlock(rows int) error {
719 1 : defer w.indexBlock.Reset()
720 1 : w.blockPropsEncoder.resetProps()
721 1 : for i := range w.blockPropCollectors {
722 1 : scratch := w.blockPropsEncoder.getScratchForProp()
723 1 : var err error
724 1 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
725 0 : return err
726 0 : }
727 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
728 : }
729 1 : indexProps := w.blockPropsEncoder.props()
730 1 : bib := bufferedIndexBlock{nEntries: rows, properties: indexProps}
731 1 :
732 1 : // Copy the last (greatest) separator key in the index block into bib.sep.
733 1 : // It'll be the separator on the entry in the top-level index block.
734 1 : //
735 1 : // TODO(jackson): bib.sep.Trailer is unused within the columnar-block
736 1 : // sstable writer. Its existence is a code artifact of reuse of the
737 1 : // bufferedIndexBlock type between colblk and rowblk writers. This can be
738 1 : // cleaned up.
739 1 : bib.sep.Trailer = base.MakeTrailer(base.SeqNumMax, base.InternalKeyKindSeparator)
740 1 : w.indexBuffering.sepAlloc, bib.sep.UserKey = w.indexBuffering.sepAlloc.Copy(
741 1 : w.indexBlock.UnsafeSeparator(rows - 1))
742 1 :
743 1 : // Finish the index block and copy it so that w.indexBlock may be reused.
744 1 : block := w.indexBlock.Finish(rows)
745 1 : if len(w.indexBuffering.blockAlloc) < len(block) {
746 1 : // Allocate enough bytes for approximately 16 index blocks.
747 1 : w.indexBuffering.blockAlloc = make([]byte, len(block)*16)
748 1 : }
749 1 : n := copy(w.indexBuffering.blockAlloc, block)
750 1 : bib.block = w.indexBuffering.blockAlloc[:n:n]
751 1 : w.indexBuffering.blockAlloc = w.indexBuffering.blockAlloc[n:]
752 1 :
753 1 : w.indexBuffering.partitions = append(w.indexBuffering.partitions, bib)
754 1 : return nil
755 : }
756 :
757 : // flushBufferedIndexBlocks writes all index blocks, including the top-level
758 : // index block if necessary, to the underlying writable. It returns the block
759 : // handle of the top index (either the only index block or the top-level index
760 : // if two-level).
761 1 : func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, err error) {
762 1 : // If there's a currently-pending index block, finish it.
763 1 : if w.indexBlock.Rows() > 0 || len(w.indexBuffering.partitions) == 0 {
764 1 : w.finishIndexBlock(w.indexBlock.Rows())
765 1 : }
766 : // We've buffered all the index blocks. Typically there's just one index
767 : // block, in which case we're writing a "single-level" index. If we're
768 : // writing a large file or the index separators happen to be excessively
769 : // long, we may have several index blocks and need to construct a
770 : // "two-level" index structure.
771 1 : switch len(w.indexBuffering.partitions) {
772 0 : case 0:
773 0 : // This is impossible because we'll flush the index block immediately
774 0 : // above this switch statement if there are no buffered partitions
775 0 : // (regardless of whether there are data block handles in the index
776 0 : // block).
777 0 : panic("unreachable")
778 1 : case 1:
779 1 : // Single-level index.
780 1 : rootIndex, err = w.layout.WriteIndexBlock(w.indexBuffering.partitions[0].block)
781 1 : if err != nil {
782 0 : return rootIndex, err
783 0 : }
784 1 : w.props.IndexSize = rootIndex.Length + block.TrailerLen
785 1 : w.props.NumDataBlocks = uint64(w.indexBuffering.partitions[0].nEntries)
786 1 : w.props.IndexType = binarySearchIndex
787 1 : default:
788 1 : // Two-level index.
789 1 : for _, part := range w.indexBuffering.partitions {
790 1 : bh, err := w.layout.WriteIndexBlock(part.block)
791 1 : if err != nil {
792 0 : return block.Handle{}, err
793 0 : }
794 1 : w.props.IndexSize += bh.Length + block.TrailerLen
795 1 : w.props.NumDataBlocks += uint64(w.indexBuffering.partitions[0].nEntries)
796 1 : w.topLevelIndexBlock.AddBlockHandle(part.sep.UserKey, bh, part.properties)
797 : }
798 1 : rootIndex, err = w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish(w.topLevelIndexBlock.Rows()))
799 1 : if err != nil {
800 0 : return block.Handle{}, err
801 0 : }
802 1 : w.props.IndexSize += rootIndex.Length + block.TrailerLen
803 1 : w.props.IndexType = twoLevelIndex
804 1 : w.props.IndexPartitions = uint64(len(w.indexBuffering.partitions))
805 : }
806 1 : return rootIndex, nil
807 : }
808 :
809 : // drainWriteQueue runs in its own goroutine and is responsible for writing
810 : // finished, compressed data blocks to the writable. It reads from w.writeQueue
811 : // until the channel is closed. All data blocks are written by this goroutine.
812 : // Other blocks are written directly by the client goroutine. See Close.
813 1 : func (w *RawColumnWriter) drainWriteQueue() {
814 1 : defer w.writeQueue.wg.Done()
815 1 : for cb := range w.writeQueue.ch {
816 1 : if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
817 0 : w.writeQueue.err = err
818 0 : }
819 1 : cb.blockBuf.clear()
820 1 : cb.physical = block.PhysicalBlock{}
821 1 : compressedBlockPool.Put(cb)
822 : }
823 : }
824 :
825 1 : func (w *RawColumnWriter) Close() (err error) {
826 1 : defer func() {
827 1 : if w.valueBlock != nil {
828 1 : releaseValueBlockWriter(w.valueBlock)
829 1 : // Defensive code in case Close gets called again. We don't want to put
830 1 : // the same object to a sync.Pool.
831 1 : w.valueBlock = nil
832 1 : }
833 1 : w.layout.Abort()
834 1 : // Record any error in the writer (so we can exit early if Close is called
835 1 : // again).
836 1 : if err != nil {
837 1 : w.err = err
838 1 : }
839 : }()
840 1 : if w.layout.writable == nil {
841 1 : return w.err
842 1 : }
843 :
844 : // Finish the last data block and send it to the write queue if it contains
845 : // any pending KVs.
846 1 : if rows := w.dataBlock.Rows(); rows > 0 {
847 1 : serializedBlock, lastKey := w.dataBlock.Finish(rows, w.pendingDataBlockSize)
848 1 : w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], lastKey.UserKey)
849 1 : w.err = errors.CombineErrors(w.err, w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf))
850 1 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
851 1 : }
852 : // Close the write queue channel so that the goroutine responsible for
853 : // writing data blocks to disk knows to exit. Any subsequent blocks (eg,
854 : // index, metadata, range key, etc) will be written by the goroutine that
855 : // called Close.
856 1 : close(w.writeQueue.ch)
857 1 : w.writeQueue.wg.Wait()
858 1 : // If the write queue encountered any errors while writing out data blocks,
859 1 : // it's stored in w.writeQueue.err.
860 1 : w.err = firstError(w.err, w.writeQueue.err)
861 1 : if w.err != nil {
862 1 : return w.err
863 1 : }
864 :
865 : // INVARIANT: w.queuedDataSize == w.layout.offset.
866 : // All data blocks have been written to disk. The queuedDataSize is the
867 : // cumulative size of all the data blocks we've sent to the write queue. Now
868 : // that they've all been flushed, queuedDataSize should match w.layout's
869 : // offset.
870 1 : if w.queuedDataSize != w.layout.offset {
871 0 : panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
872 0 : w.queuedDataSize, w.layout.offset))
873 : }
874 1 : w.props.DataSize = w.layout.offset
875 1 : if _, err = w.flushBufferedIndexBlocks(); err != nil {
876 0 : return err
877 0 : }
878 :
879 : // Write the filter block.
880 1 : if w.filterBlock != nil {
881 1 : bh, err := w.layout.WriteFilterBlock(w.filterBlock)
882 1 : if err != nil {
883 0 : return err
884 0 : }
885 1 : w.props.FilterPolicyName = w.filterBlock.policyName()
886 1 : w.props.FilterSize = bh.Length
887 : }
888 :
889 : // Write the range deletion block if non-empty.
890 1 : if w.rangeDelBlock.KeyCount() > 0 {
891 1 : w.props.NumRangeDeletions = uint64(w.rangeDelBlock.KeyCount())
892 1 : sm, la := w.rangeDelBlock.UnsafeBoundaryKeys()
893 1 : w.meta.SetSmallestRangeDelKey(sm)
894 1 : w.meta.SetLargestRangeDelKey(la)
895 1 : if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
896 0 : return err
897 0 : }
898 : }
899 :
900 : // Write the range key block if non-empty.
901 1 : if w.rangeKeyBlock.KeyCount() > 0 {
902 1 : sm, la := w.rangeKeyBlock.UnsafeBoundaryKeys()
903 1 : w.meta.SetSmallestRangeKey(sm)
904 1 : w.meta.SetLargestRangeKey(la)
905 1 : if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
906 0 : return err
907 0 : }
908 : }
909 :
910 : // Write out the value block.
911 1 : if w.valueBlock != nil {
912 1 : _, vbStats, err := w.valueBlock.finish(&w.layout, w.layout.offset)
913 1 : if err != nil {
914 0 : return err
915 0 : }
916 1 : w.props.NumValueBlocks = vbStats.numValueBlocks
917 1 : w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
918 1 : w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
919 : }
920 :
921 : // Write the properties block.
922 1 : {
923 1 : // Finish and record the prop collectors if props are not yet recorded.
924 1 : // Pre-computed props might have been copied by specialized sst creators
925 1 : // like suffix replacer.
926 1 : if len(w.props.UserProperties) == 0 {
927 1 : userProps := make(map[string]string)
928 1 : for i := range w.blockPropCollectors {
929 1 : scratch := w.blockPropsEncoder.getScratchForProp()
930 1 : // Place the shortID in the first byte.
931 1 : scratch = append(scratch, byte(i))
932 1 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
933 1 : if err != nil {
934 0 : return err
935 0 : }
936 1 : var prop string
937 1 : if len(buf) > 0 {
938 1 : prop = string(buf)
939 1 : }
940 : // NB: The property is populated in the map even if it is the
941 : // empty string, since the presence in the map is what indicates
942 : // that the block property collector was used when writing.
943 1 : userProps[w.blockPropCollectors[i].Name()] = prop
944 : }
945 1 : if len(userProps) > 0 {
946 1 : w.props.UserProperties = userProps
947 1 : }
948 : }
949 :
950 1 : var raw rowblk.Writer
951 1 : // The restart interval is set to infinity because the properties block
952 1 : // is always read sequentially and cached in a heap located object. This
953 1 : // reduces table size without a significant impact on performance.
954 1 : raw.RestartInterval = propertiesBlockRestartInterval
955 1 : w.props.CompressionOptions = rocksDBCompressionOptions
956 1 : w.props.save(w.opts.TableFormat, &raw)
957 1 : if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
958 0 : return err
959 0 : }
960 : }
961 :
962 : // Write the table footer.
963 1 : w.meta.Size, err = w.layout.Finish()
964 1 : if err != nil {
965 1 : return err
966 1 : }
967 1 : w.meta.Properties = w.props
968 1 : // Release any held memory and make any future calls error.
969 1 : // TODO(jackson): Ensure other calls error appropriately if the writer is
970 1 : // cleared.
971 1 : *w = RawColumnWriter{meta: w.meta}
972 1 : return nil
973 : }
974 :
975 : // rewriteSuffixes implements RawWriter.
976 : func (w *RawColumnWriter) rewriteSuffixes(
977 : r *Reader, wo WriterOptions, from, to []byte, concurrency int,
978 1 : ) error {
979 1 : for _, c := range w.blockPropCollectors {
980 1 : if !c.SupportsSuffixReplacement() {
981 0 : return errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
982 0 : }
983 : }
984 1 : l, err := r.Layout()
985 1 : if err != nil {
986 0 : return errors.Wrap(err, "reading layout")
987 0 : }
988 : // Copy data blocks in parallel, rewriting suffixes as we go.
989 1 : blocks, err := rewriteDataBlocksInParallel(r, wo, l.Data, from, to, concurrency, func() blockRewriter {
990 1 : return colblk.NewDataBlockRewriter(wo.KeySchema, w.comparer.Compare, w.comparer.Split)
991 1 : })
992 1 : if err != nil {
993 1 : return errors.Wrap(err, "rewriting data blocks")
994 1 : }
995 :
996 : // oldShortIDs maps the shortID for the block property collector in the old
997 : // blocks to the shortID in the new blocks. Initialized once for the sstable.
998 1 : oldShortIDs, n, err := getShortIDs(r, w.blockPropCollectors)
999 1 : if err != nil {
1000 0 : return errors.Wrap(err, "getting short IDs")
1001 0 : }
1002 1 : oldProps := make([][]byte, len(w.blockPropCollectors))
1003 1 : for i := range blocks {
1004 1 : cb := compressedBlockPool.Get().(*compressedBlock)
1005 1 : cb.physical = blocks[i].physical
1006 1 :
1007 1 : // Load any previous values for our prop collectors into oldProps.
1008 1 : for i := range oldProps {
1009 1 : oldProps[i] = nil
1010 1 : }
1011 1 : decoder := makeBlockPropertiesDecoder(n, l.Data[i].Props)
1012 1 : for !decoder.Done() {
1013 1 : id, val, err := decoder.Next()
1014 1 : if err != nil {
1015 0 : return err
1016 0 : }
1017 1 : if oldShortIDs[id].IsValid() {
1018 1 : oldProps[oldShortIDs[id]] = val
1019 1 : }
1020 : }
1021 1 : for i, p := range w.blockPropCollectors {
1022 1 : if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil {
1023 0 : return err
1024 0 : }
1025 : }
1026 1 : var separator []byte
1027 1 : if i+1 < len(blocks) {
1028 1 : w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], blocks[i].end.UserKey, blocks[i+1].start.UserKey)
1029 1 : separator = w.separatorBuf
1030 1 : } else {
1031 1 : w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], blocks[i].end.UserKey)
1032 1 : separator = w.separatorBuf
1033 1 : }
1034 1 : w.enqueuePhysicalBlock(cb, separator)
1035 : }
1036 :
1037 1 : if len(blocks) > 0 {
1038 1 : w.meta.updateSeqNum(blocks[0].start.SeqNum())
1039 1 : w.props.NumEntries = r.Properties.NumEntries
1040 1 : w.props.RawKeySize = r.Properties.RawKeySize
1041 1 : w.props.RawValueSize = r.Properties.RawValueSize
1042 1 : w.meta.SetSmallestPointKey(blocks[0].start)
1043 1 : w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
1044 1 : }
1045 :
1046 : // Copy range key block, replacing suffixes if it exists.
1047 1 : if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
1048 0 : return errors.Wrap(err, "rewriting range key blocks")
1049 0 : }
1050 : // Copy over the filter block if it exists.
1051 1 : if w.filterBlock != nil {
1052 1 : if filterBlockBH, ok := l.FilterByName(w.filterBlock.metaName()); ok {
1053 1 : filterBlock, _, err := readBlockBuf(r, filterBlockBH, nil)
1054 1 : if err != nil {
1055 0 : return errors.Wrap(err, "reading filter")
1056 0 : }
1057 1 : w.filterBlock = copyFilterWriter{
1058 1 : origPolicyName: w.filterBlock.policyName(),
1059 1 : origMetaName: w.filterBlock.metaName(),
1060 1 : data: filterBlock,
1061 1 : }
1062 : }
1063 : }
1064 1 : return nil
1065 : }
1066 :
1067 : func shouldFlushWithoutLatestKV(
1068 : sizeWithKV int, sizeWithoutKV int, entryCountWithoutKV int, flushGovernor *block.FlushGovernor,
1069 1 : ) bool {
1070 1 : if entryCountWithoutKV == 0 {
1071 1 : return false
1072 1 : }
1073 1 : if sizeWithoutKV < flushGovernor.LowWatermark() {
1074 1 : // Fast path when the block is too small to flush.
1075 1 : return false
1076 1 : }
1077 1 : return flushGovernor.ShouldFlush(sizeWithoutKV, sizeWithKV)
1078 : }
1079 :
1080 : // copyDataBlocks adds a range of blocks to the table as-is. These blocks could be
1081 : // compressed. It's specifically used by the sstable copier that can copy parts
1082 : // of an sstable to a new sstable, using CopySpan().
1083 : func (w *RawColumnWriter) copyDataBlocks(
1084 : ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
1085 1 : ) error {
1086 1 : buf := make([]byte, 0, 256<<10)
1087 1 : readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
1088 1 : if firstBlockIdx > lastBlockIdx {
1089 0 : panic("pebble: readAndFlushBlocks called with invalid block range")
1090 : }
1091 : // We need to flush blocks[firstBlockIdx:lastBlockIdx+1] into the write queue.
1092 : // We do this by issuing one big read from the read handle into the buffer, and
1093 : // then enqueueing the writing of those blocks one-by-one.
1094 : //
1095 : // TODO(bilal): Consider refactoring the write queue to support writing multiple
1096 : // blocks in one request.
1097 1 : lastBH := blocks[lastBlockIdx].bh
1098 1 : blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
1099 1 : if blocksToReadLen > uint64(cap(buf)) {
1100 0 : buf = make([]byte, 0, blocksToReadLen)
1101 0 : }
1102 1 : if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
1103 0 : return err
1104 0 : }
1105 1 : for i := firstBlockIdx; i <= lastBlockIdx; i++ {
1106 1 : offsetDiff := blocks[i].bh.Offset - blocks[firstBlockIdx].bh.Offset
1107 1 : blockBuf := buf[offsetDiff : offsetDiff+blocks[i].bh.Length+block.TrailerLen]
1108 1 : cb := compressedBlockPool.Get().(*compressedBlock)
1109 1 : cb.physical = block.NewPhysicalBlock(blockBuf)
1110 1 : if err := w.enqueuePhysicalBlock(cb, blocks[i].sep); err != nil {
1111 0 : return err
1112 0 : }
1113 : }
1114 1 : return nil
1115 : }
1116 : // Iterate through blocks until we have enough to fill cap(buf). When we have more than
1117 : // one block in blocksToRead and adding the next block would exceed the buffer capacity,
1118 : // we read and flush existing blocks in blocksToRead. This allows us to read as many
1119 : // blocks in one IO request as possible, while still utilizing the write queue in this
1120 : // writer.
1121 1 : lastBlockOffset := uint64(0)
1122 1 : for i := 0; i < len(blocks); {
1123 1 : if blocks[i].bh.Offset < lastBlockOffset {
1124 0 : panic("pebble: copyDataBlocks called with blocks out of order")
1125 : }
1126 1 : start := i
1127 1 : // Note the i++ in the initializing condition; this means we will always flush at least
1128 1 : // one block.
1129 1 : for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(cap(buf)); i++ {
1130 1 : }
1131 : // i points to one index past the last block we want to read.
1132 1 : if err := readAndFlushBlocks(start, i-1); err != nil {
1133 0 : return err
1134 0 : }
1135 : }
1136 1 : return nil
1137 : }
1138 :
1139 : // addDataBlock adds a raw uncompressed data block to the table as-is. It's specifically used
1140 : // by the sstable copier that can copy parts of an sstable to a new sstable,
1141 : // using CopySpan().
1142 1 : func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
1143 1 : // Serialize the data block, compress it and send it to the write queue.
1144 1 : cb := compressedBlockPool.Get().(*compressedBlock)
1145 1 : cb.blockBuf.checksummer.Type = w.opts.Checksum
1146 1 : cb.physical = block.CompressAndChecksum(
1147 1 : &cb.blockBuf.compressedBuf,
1148 1 : b,
1149 1 : w.opts.Compression,
1150 1 : &cb.blockBuf.checksummer,
1151 1 : )
1152 1 : if !cb.physical.IsCompressed() {
1153 1 : // If the block isn't compressed, cb.physical's underlying data points
1154 1 : // directly into a buffer owned by w.dataBlock. Clone it before passing
1155 1 : // it to the write queue to be asynchronously written to disk.
1156 1 : // TODO(jackson): Should we try to avoid this clone by tracking the
1157 1 : // lifetime of the DataBlockWriters?
1158 1 : cb.physical = cb.physical.Clone()
1159 1 : }
1160 1 : if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
1161 0 : return err
1162 0 : }
1163 1 : return nil
1164 : }
1165 :
1166 : // copyFilter copies the specified filter to the table. It's specifically used
1167 : // by the sstable copier that can copy parts of an sstable to a new sstable,
1168 : // using CopySpan().
1169 0 : func (w *RawColumnWriter) copyFilter(filter []byte, filterName string) error {
1170 0 : if w.filterBlock != nil && filterName != w.filterBlock.policyName() {
1171 0 : return errors.New("mismatched filters")
1172 0 : }
1173 0 : w.filterBlock = copyFilterWriter{
1174 0 : origPolicyName: w.filterBlock.policyName(), origMetaName: w.filterBlock.metaName(), data: filter,
1175 0 : }
1176 0 : return nil
1177 : }
1178 :
1179 : // copyProperties copies properties from the specified props, and resets others
1180 : // to prepare for copying data blocks from another sstable, using the copy/addDataBlock(s)
1181 : // methods above. It's specifically used by the sstable copier that can copy parts of an
1182 : // sstable to a new sstable, using CopySpan().
1183 1 : func (w *RawColumnWriter) copyProperties(props Properties) {
1184 1 : w.props = props
1185 1 : // Remove all user properties to disable block properties, which we do not
1186 1 : // calculate for CopySpan.
1187 1 : w.props.UserProperties = nil
1188 1 : // Reset props that we'll re-derive as we build our own index.
1189 1 : w.props.IndexPartitions = 0
1190 1 : w.props.TopLevelIndexSize = 0
1191 1 : w.props.IndexSize = 0
1192 1 : w.props.IndexType = 0
1193 1 : }
|