Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "math"
12 : "sync"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/bytealloc"
17 : "github.com/cockroachdb/pebble/internal/cache"
18 : "github.com/cockroachdb/pebble/internal/keyspan"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/sstable/block"
21 : "github.com/cockroachdb/pebble/sstable/colblk"
22 : "github.com/cockroachdb/pebble/sstable/rowblk"
23 : )
24 :
25 : // RawColumnWriter is a sstable RawWriter that writes sstables with
26 : // column-oriented blocks. All table formats TableFormatPebblev5 and later write
27 : // column-oriented blocks and use RawColumnWriter.
28 : type RawColumnWriter struct {
29 : comparer *base.Comparer
30 : meta WriterMetadata
31 : opts WriterOptions
32 : err error
33 : // dataBlockOptions and indexBlockOptions are used to configure the sstable
34 : // block flush heuristics.
35 : dataBlockOptions flushDecisionOptions
36 : indexBlockOptions flushDecisionOptions
37 : allocatorSizeClasses []int
38 : blockPropCollectors []BlockPropertyCollector
39 : blockPropsEncoder blockPropertiesEncoder
40 : obsoleteCollector obsoleteKeyBlockPropertyCollector
41 : props Properties
42 : // block writers buffering unflushed data.
43 : dataBlock struct {
44 : colblk.DataBlockWriter
45 : // numDeletions stores the count of point tombstones in this data block.
46 : // It's used to determine if this data block is considered
47 : // tombstone-dense for the purposes of compaction.
48 : numDeletions int
49 : // deletionSize stores the raw size of point tombstones in this data
50 : // block. It's used to determine if this data block is considered
51 : // tombstone-dense for the purposes of compaction.
52 : deletionSize int
53 : }
54 : indexBlock colblk.IndexBlockWriter
55 : topLevelIndexBlock colblk.IndexBlockWriter
56 : rangeDelBlock colblk.KeyspanBlockWriter
57 : rangeKeyBlock colblk.KeyspanBlockWriter
58 : valueBlock *valueBlockWriter // nil iff WriterOptions.DisableValueBlocks=true
59 : // filter accumulates the filter block. If populated, the filter ingests
60 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
61 : // nil, or the full keys otherwise.
62 : filterBlock filterWriter
63 : prevPointKey struct {
64 : trailer base.InternalKeyTrailer
65 : isObsolete bool
66 : }
67 : pendingDataBlockSize int
68 : indexBlockSize int
69 : queuedDataSize uint64
70 :
71 : // indexBuffering holds finished index blocks as they're completed while
72 : // building the sstable. If an index block grows sufficiently large
73 : // (IndexBlockSize) while an sstable is still being constructed, the sstable
74 : // writer will create a two-level index structure. As index blocks are
75 : // completed, they're finished and buffered in-memory until the table is
76 : // finished. When the table is finished, the buffered index blocks are
77 : // flushed in order after all the data blocks, and the top-level index block
78 : // is constructed to point to all the individual index blocks.
79 : indexBuffering struct {
80 : // partitions holds all the completed index blocks.
81 : partitions []bufferedIndexBlock
82 : // blockAlloc is used to bulk-allocate byte slices used to store index
83 : // blocks in partitions. These live until the sstable is finished.
84 : blockAlloc []byte
85 : // sepAlloc is used to bulk-allocate index block separator slices stored
86 : // in partitions. These live until the sstable is finished.
87 : sepAlloc bytealloc.A
88 : }
89 :
90 : writeQueue struct {
91 : wg sync.WaitGroup
92 : ch chan *compressedBlock
93 : err error
94 : }
95 : layout layoutWriter
96 :
97 : separatorBuf []byte
98 : tmp []byte
99 : disableKeyOrderChecks bool
100 : }
101 :
102 : // Assert that *RawColumnWriter implements RawWriter.
103 : var _ RawWriter = (*RawColumnWriter)(nil)
104 :
105 1 : func NewColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
106 1 : if writable == nil {
107 0 : panic("pebble: nil writable")
108 : }
109 1 : o = o.ensureDefaults()
110 1 : w := &RawColumnWriter{
111 1 : comparer: o.Comparer,
112 1 : meta: WriterMetadata{
113 1 : SmallestSeqNum: math.MaxUint64,
114 1 : },
115 1 : dataBlockOptions: flushDecisionOptions{
116 1 : blockSize: o.BlockSize,
117 1 : blockSizeThreshold: (o.BlockSize*o.BlockSizeThreshold + 99) / 100,
118 1 : sizeClassAwareThreshold: (o.BlockSize*o.SizeClassAwareThreshold + 99) / 100,
119 1 : },
120 1 : indexBlockOptions: flushDecisionOptions{
121 1 : blockSize: o.IndexBlockSize,
122 1 : blockSizeThreshold: (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100,
123 1 : sizeClassAwareThreshold: (o.IndexBlockSize*o.SizeClassAwareThreshold + 99) / 100,
124 1 : },
125 1 : allocatorSizeClasses: o.AllocatorSizeClasses,
126 1 : opts: o,
127 1 : layout: makeLayoutWriter(writable, o),
128 1 : }
129 1 : w.dataBlock.Init(o.KeySchema)
130 1 : w.indexBlock.Init()
131 1 : w.topLevelIndexBlock.Init()
132 1 : w.rangeDelBlock.Init(w.comparer.Equal)
133 1 : w.rangeKeyBlock.Init(w.comparer.Equal)
134 1 : if !o.DisableValueBlocks {
135 1 : w.valueBlock = newValueBlockWriter(
136 1 : w.dataBlockOptions.blockSize, w.dataBlockOptions.blockSizeThreshold,
137 1 : w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
138 : }
139 1 : if o.FilterPolicy != nil {
140 0 : switch o.FilterType {
141 0 : case TableFilter:
142 0 : w.filterBlock = newTableFilterWriter(o.FilterPolicy)
143 0 : default:
144 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
145 : }
146 : }
147 :
148 1 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors) + 1 // +1 for the obsolete collector
149 1 : if numBlockPropertyCollectors > maxPropertyCollectors {
150 0 : panic(errors.New("pebble: too many block property collectors"))
151 : }
152 1 : w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
153 1 : for _, constructFn := range o.BlockPropertyCollectors {
154 0 : w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
155 0 : }
156 1 : w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
157 1 : var buf bytes.Buffer
158 1 : buf.WriteString("[")
159 1 : for i := range w.blockPropCollectors {
160 1 : if i > 0 {
161 0 : buf.WriteString(",")
162 0 : }
163 1 : buf.WriteString(w.blockPropCollectors[i].Name())
164 : }
165 1 : buf.WriteString("]")
166 1 : w.props.PropertyCollectorNames = buf.String()
167 1 :
168 1 : w.props.ComparerName = o.Comparer.Name
169 1 : w.props.CompressionName = o.Compression.String()
170 1 : w.props.MergerName = o.MergerName
171 1 :
172 1 : w.writeQueue.ch = make(chan *compressedBlock)
173 1 : w.writeQueue.wg.Add(1)
174 1 : go w.drainWriteQueue()
175 1 : return w
176 : }
177 :
178 : // Error returns the current accumulated error if any.
179 0 : func (w *RawColumnWriter) Error() error {
180 0 : return w.err
181 0 : }
182 :
183 : // EstimatedSize returns the estimated size of the sstable being written if
184 : // a call to Close() was made without adding additional keys.
185 0 : func (w *RawColumnWriter) EstimatedSize() uint64 {
186 0 : sz := rocksDBFooterLen + w.queuedDataSize
187 0 : // TODO(jackson): Avoid iterating over partitions by incrementally
188 0 : // maintaining the size contribution of all buffered partitions.
189 0 : for _, bib := range w.indexBuffering.partitions {
190 0 : // We include the separator user key to account for its bytes in the
191 0 : // top-level index block.
192 0 : //
193 0 : // TODO(jackson): We could incrementally build the top-level index block
194 0 : // and produce an exact calculation of the current top-level index
195 0 : // block's size.
196 0 : sz += uint64(len(bib.block) + block.TrailerLen + len(bib.sep.UserKey))
197 0 : }
198 0 : if w.rangeDelBlock.KeyCount() > 0 {
199 0 : sz += uint64(w.rangeDelBlock.Size())
200 0 : }
201 0 : if w.rangeKeyBlock.KeyCount() > 0 {
202 0 : sz += uint64(w.rangeKeyBlock.Size())
203 0 : }
204 0 : for _, blk := range w.valueBlock.blocks {
205 0 : sz += uint64(blk.block.LengthWithTrailer())
206 0 : }
207 0 : if w.valueBlock.buf != nil {
208 0 : sz += uint64(len(w.valueBlock.buf.b))
209 0 : }
210 : // TODO(jackson): Include an estimate of the properties, filter and meta
211 : // index blocks sizes.
212 0 : return sz
213 : }
214 :
215 : // Metadata returns the metadata for the finished sstable. Only valid to call
216 : // after the sstable has been finished.
217 1 : func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
218 1 : if !w.layout.IsFinished() {
219 0 : return nil, errors.New("pebble: writer is not closed")
220 0 : }
221 1 : return &w.meta, nil
222 : }
223 :
224 : // EncodeSpan encodes the keys in the given span. The span can contain either
225 : // only RANGEDEL keys or only range keys.
226 1 : func (w *RawColumnWriter) EncodeSpan(span keyspan.Span) error {
227 1 : if span.Empty() {
228 0 : return nil
229 0 : }
230 1 : for _, k := range span.Keys {
231 1 : w.meta.updateSeqNum(k.SeqNum())
232 1 : }
233 1 : if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
234 1 : w.rangeDelBlock.AddSpan(span)
235 1 : return nil
236 1 : }
237 1 : w.rangeKeyBlock.AddSpan(span)
238 1 : return nil
239 : }
240 :
241 : // AddWithForceObsolete adds a point key/value pair when writing a
242 : // strict-obsolete sstable. For a given Writer, the keys passed to Add must be
243 : // in increasing order. Span keys (range deletions, range keys) must be added
244 : // through EncodeSpan.
245 : //
246 : // forceObsolete indicates whether the caller has determined that this key is
247 : // obsolete even though it may be the latest point key for this userkey. This
248 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
249 : // strict-obsolete sstables.
250 : //
251 : // Note that there are two properties, S1 and S2 (see comment in format.go)
252 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
253 : // responsibility of the caller. S1 is solely the responsibility of the
254 : // callee.
255 : func (w *RawColumnWriter) AddWithForceObsolete(
256 : key InternalKey, value []byte, forceObsolete bool,
257 1 : ) error {
258 1 : switch key.Kind() {
259 : case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
260 0 : base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
261 0 : return errors.Newf("%s must be added through EncodeSpan", key.Kind())
262 0 : case base.InternalKeyKindMerge:
263 0 : if w.opts.IsStrictObsolete {
264 0 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
265 0 : }
266 : }
267 :
268 1 : eval, err := w.evaluatePoint(key, len(value))
269 1 : if err != nil {
270 0 : return err
271 0 : }
272 1 : eval.isObsolete = eval.isObsolete || forceObsolete
273 1 : w.prevPointKey.trailer = key.Trailer
274 1 : w.prevPointKey.isObsolete = eval.isObsolete
275 1 :
276 1 : var valuePrefix block.ValuePrefix
277 1 : var valueStoredWithKey []byte
278 1 : if eval.writeToValueBlock {
279 0 : vh, err := w.valueBlock.addValue(value)
280 0 : if err != nil {
281 0 : return err
282 0 : }
283 0 : n := encodeValueHandle(w.tmp[:], vh)
284 0 : valueStoredWithKey = w.tmp[:n]
285 0 : var attribute base.ShortAttribute
286 0 : if w.opts.ShortAttributeExtractor != nil {
287 0 : // TODO(sumeer): for compactions, it is possible that the input sstable
288 0 : // already has this value in the value section and so we have already
289 0 : // extracted the ShortAttribute. Avoid extracting it again. This will
290 0 : // require changing the RawWriter.Add interface.
291 0 : if attribute, err = w.opts.ShortAttributeExtractor(
292 0 : key.UserKey, int(eval.kcmp.PrefixLen), value); err != nil {
293 0 : return err
294 0 : }
295 : }
296 0 : valuePrefix = block.ValueHandlePrefix(eval.kcmp.PrefixEqual(), attribute)
297 1 : } else {
298 1 : valueStoredWithKey = value
299 1 : if len(value) > 0 {
300 1 : valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
301 1 : }
302 : }
303 :
304 : // Append the key to the data block. We have NOT yet committed to
305 : // including the key in the block. The data block writer permits us to
306 : // finish the block excluding the last-appended KV.
307 1 : entriesWithoutKV := w.dataBlock.Rows()
308 1 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
309 1 :
310 1 : // Now that we've appended the KV pair, we can compute the exact size of the
311 1 : // block with this key-value pair included. Check to see if we should flush
312 1 : // the current block, either with or without the added key-value pair.
313 1 : size := w.dataBlock.Size()
314 1 : if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize,
315 1 : entriesWithoutKV, w.dataBlockOptions, w.allocatorSizeClasses) {
316 1 : // Flush the data block excluding the key we just added.
317 1 : w.flushDataBlockWithoutNextKey(key.UserKey)
318 1 : // flushDataBlockWithoutNextKey reset the data block builder, and we can
319 1 : // add the key to this next block now.
320 1 : w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete)
321 1 : w.pendingDataBlockSize = w.dataBlock.Size()
322 1 : } else {
323 1 : // We're not flushing the data block, and we're committing to including
324 1 : // the current KV in the block. Remember the new size of the data block
325 1 : // with the current KV.
326 1 : w.pendingDataBlockSize = size
327 1 : }
328 :
329 1 : for i := range w.blockPropCollectors {
330 1 : v := value
331 1 : if key.Kind() == base.InternalKeyKindSet {
332 1 : // Values for SET are not required to be in-place, and in the future
333 1 : // may not even be read by the compaction, so pass nil values. Block
334 1 : // property collectors in such Pebble DB's must not look at the
335 1 : // value.
336 1 : v = nil
337 1 : }
338 1 : if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
339 0 : w.err = err
340 0 : return err
341 0 : }
342 : }
343 1 : w.obsoleteCollector.AddPoint(eval.isObsolete)
344 1 : if w.filterBlock != nil {
345 0 : w.filterBlock.addKey(key.UserKey[:eval.kcmp.PrefixLen])
346 0 : }
347 1 : w.meta.updateSeqNum(key.SeqNum())
348 1 : if !w.meta.HasPointKeys {
349 1 : w.meta.SetSmallestPointKey(key.Clone())
350 1 : }
351 :
352 1 : w.props.NumEntries++
353 1 : switch key.Kind() {
354 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
355 1 : w.props.NumDeletions++
356 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
357 1 : w.dataBlock.numDeletions++
358 1 : w.dataBlock.deletionSize += len(key.UserKey)
359 0 : case InternalKeyKindDeleteSized:
360 0 : var size uint64
361 0 : if len(value) > 0 {
362 0 : var n int
363 0 : size, n = binary.Uvarint(value)
364 0 : if n <= 0 {
365 0 : return errors.Newf("%s key's value (%x) does not parse as uvarint",
366 0 : errors.Safe(key.Kind().String()), value)
367 0 : }
368 : }
369 0 : w.props.NumDeletions++
370 0 : w.props.NumSizedDeletions++
371 0 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
372 0 : w.props.RawPointTombstoneValueSize += size
373 0 : w.dataBlock.numDeletions++
374 0 : w.dataBlock.deletionSize += len(key.UserKey)
375 0 : case InternalKeyKindMerge:
376 0 : w.props.NumMergeOperands++
377 : }
378 1 : w.props.RawKeySize += uint64(key.Size())
379 1 : w.props.RawValueSize += uint64(len(value))
380 1 : return nil
381 : }
382 :
383 : type pointKeyEvaluation struct {
384 : kcmp colblk.KeyComparison
385 : isObsolete bool
386 : writeToValueBlock bool
387 : }
388 :
389 : // evaluatePoint takes information about a point key being written to the
390 : // sstable and decides how the point should be represented, where its value
391 : // should be stored, etc.
392 : func (w *RawColumnWriter) evaluatePoint(
393 : key base.InternalKey, valueLen int,
394 1 : ) (eval pointKeyEvaluation, err error) {
395 1 : eval.kcmp = w.dataBlock.KeyWriter.ComparePrev(key.UserKey)
396 1 : if !w.meta.HasPointKeys {
397 1 : return eval, nil
398 1 : }
399 1 : keyKind := key.Kind()
400 1 : // Ensure that no one adds a point key kind without considering the obsolete
401 1 : // handling for that kind.
402 1 : switch keyKind {
403 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
404 1 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
405 0 : default:
406 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
407 : }
408 1 : prevKeyKind := w.prevPointKey.trailer.Kind()
409 1 : // If same user key, then the current key is obsolete if any of the
410 1 : // following is true:
411 1 : // C1 The prev key was obsolete.
412 1 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
413 1 : // preserve SET* and MERGE since their values will be merged into the
414 1 : // previous key. We also must preserve DEL* since there may be an older
415 1 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
416 1 : // if we omit the DEL* that lower SET*/MERGE will become visible.
417 1 : //
418 1 : // Regardless of whether it is the same user key or not
419 1 : // C3 The current key is some kind of point delete, and we are writing to
420 1 : // the lowest level, then it is also obsolete. The correctness of this
421 1 : // relies on the same user key not spanning multiple sstables in a level.
422 1 : //
423 1 : // C1 ensures that for a user key there is at most one transition from
424 1 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
425 1 : // are not obsolete. We consider the various value of n:
426 1 : //
427 1 : // n = 0: This happens due to forceObsolete being set by the caller, or due
428 1 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
429 1 : // must also delete all the lower seqnums for the same user key. C3 triggers
430 1 : // due to a point delete and that deletes all the lower seqnums for the same
431 1 : // user key.
432 1 : //
433 1 : // n = 1: This is the common case. It happens when the first key is not a
434 1 : // MERGE, or the current key is some kind of point delete.
435 1 : //
436 1 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
437 1 : // single non-MERGE key.
438 1 : isObsoleteC1AndC2 := eval.kcmp.UserKeyComparison == 0 &&
439 1 : (w.prevPointKey.isObsolete || prevKeyKind != InternalKeyKindMerge)
440 1 : isObsoleteC3 := w.opts.WritingToLowestLevel &&
441 1 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
442 1 : keyKind == InternalKeyKindDeleteSized)
443 1 : eval.isObsolete = isObsoleteC1AndC2 || isObsoleteC3
444 1 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
445 1 : // possible, but requires some care in documenting and checking invariants.
446 1 : // There is code that assumes nothing in value blocks because of single MVCC
447 1 : // version (those should be ok). We have to ensure setHasSamePrefix is
448 1 : // correctly initialized here etc.
449 1 :
450 1 : if !w.disableKeyOrderChecks && (eval.kcmp.UserKeyComparison < 0 ||
451 1 : (eval.kcmp.UserKeyComparison == 0 && w.prevPointKey.trailer <= key.Trailer)) {
452 0 : return eval, errors.Errorf(
453 0 : "pebble: keys must be added in strictly increasing order: %s",
454 0 : key.Pretty(w.comparer.FormatKey))
455 0 : }
456 :
457 : // We might want to write this key's value to a value block if it has the
458 : // same prefix.
459 : //
460 : // We require:
461 : // . Value blocks to be enabled.
462 : // . The current key to have the same prefix as the previous key.
463 : // . The previous key to be a SET.
464 : // . The current key to be a SET.
465 : // . If there are bounds requiring some keys' values to be in-place, the
466 : // key must not fall within those bounds.
467 : // . The value to be sufficiently large. (Currently we simply require a
468 : // non-zero length, so all non-empty values are eligible for storage
469 : // out-of-band in a value block.)
470 1 : if w.opts.DisableValueBlocks || !eval.kcmp.PrefixEqual() ||
471 1 : prevKeyKind != InternalKeyKindSet || keyKind == InternalKeyKindSet {
472 1 : return eval, nil
473 1 : }
474 : // NB: it is possible that eval.kcmp.UserKeyComparison == 0, i.e., these two
475 : // SETs have identical user keys (because of an open snapshot). This should
476 : // be the rare case.
477 :
478 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
479 : // valueHandle, this should be > 3. But tiny values are common in test and
480 : // unlikely in production, so we use 0 here for better test coverage.
481 0 : const tinyValueThreshold = 0
482 0 : if valueLen <= tinyValueThreshold {
483 0 : return eval, nil
484 0 : }
485 :
486 : // If there are bounds requiring some keys' values to be in-place, compare
487 : // the prefix against the bounds.
488 0 : if !w.opts.RequiredInPlaceValueBound.IsEmpty() {
489 0 : if w.comparer.Compare(w.opts.RequiredInPlaceValueBound.Upper, key.UserKey[:eval.kcmp.PrefixLen]) <= 0 {
490 0 : // Common case for CockroachDB. Make it empty since all future keys
491 0 : // in this sstable will also have cmpUpper <= 0.
492 0 : w.opts.RequiredInPlaceValueBound = UserKeyPrefixBound{}
493 0 : } else if w.comparer.Compare(key.UserKey[:eval.kcmp.PrefixLen], w.opts.RequiredInPlaceValueBound.Lower) >= 0 {
494 0 : // Don't write to value block if the key is within the bounds.
495 0 : return eval, nil
496 0 : }
497 : }
498 0 : eval.writeToValueBlock = w.valueBlock != nil
499 0 : return eval, nil
500 : }
501 :
502 : var compressedBlockPool = sync.Pool{
503 1 : New: func() interface{} {
504 1 : return new(compressedBlock)
505 1 : },
506 : }
507 :
508 : type compressedBlock struct {
509 : physical block.PhysicalBlock
510 : blockBuf blockBuf
511 : }
512 :
513 1 : func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) {
514 1 : serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize)
515 1 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
516 1 : // Compute the separator that will be written to the index block alongside
517 1 : // this data block's end offset. It is the separator between the last key in
518 1 : // the finished block and the [nextKey] that was excluded from the block.
519 1 : w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey)
520 1 : w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf)
521 1 : w.dataBlock.Reset()
522 1 : w.pendingDataBlockSize = 0
523 1 : }
524 :
525 : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
526 : // blocks if the number of deletions in the data block exceeds a threshold or
527 : // the deletion size exceeds a threshold. It should be called after the
528 : // data block has been finished.
529 : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
530 1 : func (w *RawColumnWriter) maybeIncrementTombstoneDenseBlocks(uncompressedLen int) {
531 1 : minSize := w.opts.DeletionSizeRatioThreshold * float32(uncompressedLen)
532 1 : if w.dataBlock.numDeletions > w.opts.NumDeletionsThreshold || float32(w.dataBlock.deletionSize) > minSize {
533 0 : w.props.NumTombstoneDenseBlocks++
534 0 : }
535 1 : w.dataBlock.numDeletions = 0
536 1 : w.dataBlock.deletionSize = 0
537 : }
538 :
539 : // enqueueDataBlock compresses and checksums the provided data block and sends
540 : // it to the write queue to be asynchronously written to the underlying storage.
541 : // It also adds the block's index block separator to the pending index block,
542 : // possibly triggering the index block to be finished and buffered.
543 : func (w *RawColumnWriter) enqueueDataBlock(
544 : serializedBlock []byte, lastKey base.InternalKey, separator []byte,
545 1 : ) error {
546 1 : // TODO(jackson): Avoid allocating the largest point user key every time we
547 1 : // set the largest point key. This is what the rowblk writer does too, but
548 1 : // it's unnecessary.
549 1 : w.meta.SetLargestPointKey(lastKey.Clone())
550 1 :
551 1 : // Serialize the data block, compress it and send it to the write queue.
552 1 : cb := compressedBlockPool.Get().(*compressedBlock)
553 1 : cb.blockBuf.checksummer.Type = w.opts.Checksum
554 1 : cb.physical = block.CompressAndChecksum(
555 1 : &cb.blockBuf.compressedBuf,
556 1 : serializedBlock,
557 1 : w.opts.Compression,
558 1 : &cb.blockBuf.checksummer,
559 1 : )
560 1 : if !cb.physical.IsCompressed() {
561 1 : // If the block isn't compressed, cb.physical's underlying data points
562 1 : // directly into a buffer owned by w.dataBlock. Clone it before passing
563 1 : // it to the write queue to be asynchronously written to disk.
564 1 : // TODO(jackson): Should we try to avoid this clone by tracking the
565 1 : // lifetime of the DataBlockWriters?
566 1 : cb.physical = cb.physical.Clone()
567 1 : }
568 1 : dataBlockHandle := block.Handle{
569 1 : Offset: w.queuedDataSize,
570 1 : Length: uint64(cb.physical.LengthWithoutTrailer()),
571 1 : }
572 1 : w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
573 1 : w.writeQueue.ch <- cb
574 1 :
575 1 : var err error
576 1 : w.blockPropsEncoder.resetProps()
577 1 : for i := range w.blockPropCollectors {
578 1 : scratch := w.blockPropsEncoder.getScratchForProp()
579 1 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
580 0 : return err
581 0 : }
582 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
583 : }
584 1 : dataBlockProps := w.blockPropsEncoder.unsafeProps()
585 1 :
586 1 : // Add the separator to the index block. This might trigger a flush of the
587 1 : // index block too.
588 1 : i := w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
589 1 : sizeWithEntry := w.indexBlock.Size()
590 1 : if shouldFlushWithoutLatestKV(sizeWithEntry, w.indexBlockSize, i, w.indexBlockOptions, w.allocatorSizeClasses) {
591 0 : if err = w.finishIndexBlock(w.indexBlock.Rows() - 1); err != nil {
592 0 : return err
593 0 : }
594 : // finishIndexBlock reset the index block builder, and we can
595 : // add the block handle to this new index block.
596 0 : _ = w.indexBlock.AddBlockHandle(separator, dataBlockHandle, dataBlockProps)
597 : }
598 : // Incorporate the finished data block's property into the index block, now
599 : // that we've flushed the index block without the new separator if
600 : // necessary.
601 1 : for i := range w.blockPropCollectors {
602 1 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
603 1 : }
604 1 : return nil
605 : }
606 :
607 : // finishIndexBlock finishes the currently pending index block with the first
608 : // [rows] rows. In practice, [rows] is always w.indexBlock.Rows() or
609 : // w.indexBlock.Rows()-1.
610 : //
611 : // The finished index block is buffered until the writer is closed.
612 1 : func (w *RawColumnWriter) finishIndexBlock(rows int) error {
613 1 : defer w.indexBlock.Reset()
614 1 : w.blockPropsEncoder.resetProps()
615 1 : for i := range w.blockPropCollectors {
616 1 : scratch := w.blockPropsEncoder.getScratchForProp()
617 1 : var err error
618 1 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
619 0 : return err
620 0 : }
621 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
622 : }
623 1 : indexProps := w.blockPropsEncoder.props()
624 1 : bib := bufferedIndexBlock{nEntries: rows, properties: indexProps}
625 1 :
626 1 : // Copy the last (greatest) separator key in the index block into bib.sep.
627 1 : // It'll be the separator on the entry in the top-level index block.
628 1 : //
629 1 : // TODO(jackson): bib.sep.Trailer is unused within the columnar-block
630 1 : // sstable writer. Its existence is a code artifact of reuse of the
631 1 : // bufferedIndexBlock type between colblk and rowblk writers. This can be
632 1 : // cleaned up.
633 1 : bib.sep.Trailer = base.MakeTrailer(base.SeqNumMax, base.InternalKeyKindSeparator)
634 1 : w.indexBuffering.sepAlloc, bib.sep.UserKey = w.indexBuffering.sepAlloc.Copy(
635 1 : w.indexBlock.UnsafeSeparator(rows - 1))
636 1 :
637 1 : // Finish the index block and copy it so that w.indexBlock may be reused.
638 1 : block := w.indexBlock.Finish(rows)
639 1 : if len(w.indexBuffering.blockAlloc) < len(block) {
640 1 : // Allocate enough bytes for approximately 16 index blocks.
641 1 : w.indexBuffering.blockAlloc = make([]byte, len(block)*16)
642 1 : }
643 1 : n := copy(w.indexBuffering.blockAlloc, block)
644 1 : bib.block = w.indexBuffering.blockAlloc[:n:n]
645 1 : w.indexBuffering.blockAlloc = w.indexBuffering.blockAlloc[n:]
646 1 :
647 1 : w.indexBuffering.partitions = append(w.indexBuffering.partitions, bib)
648 1 : return nil
649 : }
650 :
651 : // flushBufferedIndexBlocks writes all index blocks, including the top-level
652 : // index block if necessary, to the underlying writable. It returns the block
653 : // handle of the top index (either the only index block or the top-level index
654 : // if two-level).
655 1 : func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, err error) {
656 1 : // If there's a currently-pending index block, finish it.
657 1 : if w.indexBlock.Rows() > 0 || len(w.indexBuffering.partitions) == 0 {
658 1 : w.finishIndexBlock(w.indexBlock.Rows())
659 1 : }
660 : // We've buffered all the index blocks. Typically there's just one index
661 : // block, in which case we're writing a "single-level" index. If we're
662 : // writing a large file or the index separators happen to be excessively
663 : // long, we may have several index blocks and need to construct a
664 : // "two-level" index structure.
665 1 : switch len(w.indexBuffering.partitions) {
666 0 : case 0:
667 0 : // This is impossible because we'll flush the index block immediately
668 0 : // above this switch statement if there are no buffered partitions
669 0 : // (regardless of whether there are data block handles in the index
670 0 : // block).
671 0 : panic("unreachable")
672 1 : case 1:
673 1 : // Single-level index.
674 1 : rootIndex, err = w.layout.WriteIndexBlock(w.indexBuffering.partitions[0].block)
675 1 : if err != nil {
676 0 : return rootIndex, err
677 0 : }
678 1 : w.props.IndexSize = rootIndex.Length + block.TrailerLen
679 1 : w.props.NumDataBlocks = uint64(w.indexBuffering.partitions[0].nEntries)
680 1 : w.props.IndexType = binarySearchIndex
681 0 : default:
682 0 : // Two-level index.
683 0 : for _, part := range w.indexBuffering.partitions {
684 0 : bh, err := w.layout.WriteIndexBlock(part.block)
685 0 : if err != nil {
686 0 : return block.Handle{}, err
687 0 : }
688 0 : w.props.IndexSize += bh.Length + block.TrailerLen
689 0 : w.props.NumDataBlocks += uint64(w.indexBuffering.partitions[0].nEntries)
690 0 : w.topLevelIndexBlock.AddBlockHandle(part.sep.UserKey, bh, part.properties)
691 : }
692 0 : rootIndex, err = w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish(w.topLevelIndexBlock.Rows()))
693 0 : if err != nil {
694 0 : return block.Handle{}, err
695 0 : }
696 0 : w.props.IndexSize += rootIndex.Length + block.TrailerLen
697 0 : w.props.IndexType = twoLevelIndex
698 : }
699 1 : return rootIndex, nil
700 : }
701 :
702 : // drainWriteQueue runs in its own goroutine and is responsible for writing
703 : // finished, compressed data blocks to the writable. It reads from w.writeQueue
704 : // until the channel is closed. All data blocks are written by this goroutine.
705 : // Other blocks are written directly by the client goroutine. See Close.
706 1 : func (w *RawColumnWriter) drainWriteQueue() {
707 1 : defer w.writeQueue.wg.Done()
708 1 : for cb := range w.writeQueue.ch {
709 1 : if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
710 0 : w.writeQueue.err = err
711 0 : }
712 1 : cb.blockBuf.clear()
713 1 : cb.physical = block.PhysicalBlock{}
714 1 : compressedBlockPool.Put(cb)
715 : }
716 : }
717 :
718 1 : func (w *RawColumnWriter) Close() (err error) {
719 1 : defer func() {
720 1 : if w.valueBlock != nil {
721 0 : releaseValueBlockWriter(w.valueBlock)
722 0 : // Defensive code in case Close gets called again. We don't want to put
723 0 : // the same object to a sync.Pool.
724 0 : w.valueBlock = nil
725 0 : }
726 1 : w.layout.Abort()
727 1 : // Record any error in the writer (so we can exit early if Close is called
728 1 : // again).
729 1 : if err != nil {
730 0 : w.err = err
731 0 : }
732 : }()
733 :
734 : // Finish the last data block and send it to the write queue if it contains
735 : // any pending KVs.
736 1 : if rows := w.dataBlock.Rows(); rows > 0 {
737 1 : serializedBlock, lastKey := w.dataBlock.Finish(rows, w.pendingDataBlockSize)
738 1 : w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], lastKey.UserKey)
739 1 : w.err = errors.CombineErrors(w.err, w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf))
740 1 : w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock))
741 1 : }
742 : // Close the write queue channel so that the goroutine responsible for
743 : // writing data blocks to disk knows to exit. Any subsequent blocks (eg,
744 : // index, metadata, range key, etc) will be written by the goroutine that
745 : // called Close.
746 1 : close(w.writeQueue.ch)
747 1 : w.writeQueue.wg.Wait()
748 1 : // If the write queue encountered any errors while writing out data blocks,
749 1 : // it's stored in w.writeQueue.err.
750 1 : w.err = firstError(w.err, w.writeQueue.err)
751 1 : if w.err != nil {
752 0 : return w.err
753 0 : }
754 :
755 : // INVARIANT: w.queuedDataSize == w.layout.offset.
756 : // All data blocks have been written to disk. The queuedDataSize is the
757 : // cumulative size of all the data blocks we've sent to the write queue. Now
758 : // that they've all been flushed, queuedDataSize should match w.layout's
759 : // offset.
760 1 : if w.queuedDataSize != w.layout.offset {
761 0 : panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
762 0 : w.queuedDataSize, w.layout.offset))
763 : }
764 1 : if _, err = w.flushBufferedIndexBlocks(); err != nil {
765 0 : return err
766 0 : }
767 :
768 : // Write the filter block.
769 1 : if w.filterBlock != nil {
770 0 : bh, err := w.layout.WriteFilterBlock(w.filterBlock)
771 0 : if err != nil {
772 0 : return err
773 0 : }
774 0 : w.props.FilterPolicyName = w.filterBlock.policyName()
775 0 : w.props.FilterSize = bh.Length
776 : }
777 :
778 : // Write the range deletion block if non-empty.
779 1 : if w.rangeDelBlock.KeyCount() > 0 {
780 1 : w.props.NumRangeDeletions = uint64(w.rangeDelBlock.KeyCount())
781 1 : sm, la := w.rangeDelBlock.UnsafeBoundaryKeys()
782 1 : w.meta.SetSmallestRangeDelKey(sm)
783 1 : w.meta.SetLargestRangeDelKey(la)
784 1 : if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
785 0 : return err
786 0 : }
787 : }
788 :
789 : // Write the range key block if non-empty.
790 1 : if w.rangeKeyBlock.KeyCount() > 0 {
791 1 : sm, la := w.rangeKeyBlock.UnsafeBoundaryKeys()
792 1 : w.meta.SetSmallestRangeKey(sm)
793 1 : w.meta.SetLargestRangeKey(la)
794 1 : if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
795 0 : return err
796 0 : }
797 : }
798 :
799 : // Write out the value block.
800 1 : if w.valueBlock != nil {
801 1 : _, vbStats, err := w.valueBlock.finish(&w.layout, w.layout.offset)
802 1 : if err != nil {
803 0 : return err
804 0 : }
805 1 : w.props.NumValueBlocks = vbStats.numValueBlocks
806 1 : w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
807 1 : w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
808 : }
809 :
810 : // Write the properties block.
811 1 : {
812 1 : // Finish and record the prop collectors if props are not yet recorded.
813 1 : // Pre-computed props might have been copied by specialized sst creators
814 1 : // like suffix replacer.
815 1 : if len(w.props.UserProperties) == 0 {
816 1 : userProps := make(map[string]string)
817 1 : for i := range w.blockPropCollectors {
818 1 : scratch := w.blockPropsEncoder.getScratchForProp()
819 1 : // Place the shortID in the first byte.
820 1 : scratch = append(scratch, byte(i))
821 1 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
822 1 : if err != nil {
823 0 : return err
824 0 : }
825 1 : var prop string
826 1 : if len(buf) > 0 {
827 1 : prop = string(buf)
828 1 : }
829 : // NB: The property is populated in the map even if it is the
830 : // empty string, since the presence in the map is what indicates
831 : // that the block property collector was used when writing.
832 1 : userProps[w.blockPropCollectors[i].Name()] = prop
833 : }
834 1 : if len(userProps) > 0 {
835 1 : w.props.UserProperties = userProps
836 1 : }
837 : }
838 :
839 1 : var raw rowblk.Writer
840 1 : // The restart interval is set to infinity because the properties block
841 1 : // is always read sequentially and cached in a heap located object. This
842 1 : // reduces table size without a significant impact on performance.
843 1 : raw.RestartInterval = propertiesBlockRestartInterval
844 1 : w.props.CompressionOptions = rocksDBCompressionOptions
845 1 : w.props.save(w.opts.TableFormat, &raw)
846 1 : if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
847 0 : return err
848 0 : }
849 : }
850 :
851 : // Write the table footer.
852 1 : w.meta.Size, err = w.layout.Finish()
853 1 : if err != nil {
854 0 : return err
855 0 : }
856 1 : w.meta.Properties = w.props
857 1 : // Release any held memory and make any future calls error.
858 1 : // TODO(jackson): Ensure other calls error appropriately if the writer is
859 1 : // cleared.
860 1 : *w = RawColumnWriter{meta: w.meta}
861 1 : return nil
862 : }
863 :
864 : func shouldFlushWithoutLatestKV(
865 : sizeWithKV int,
866 : sizeWithoutKV int,
867 : entryCountWithoutKV int,
868 : flushOptions flushDecisionOptions,
869 : sizeClassHints []int,
870 1 : ) bool {
871 1 : if entryCountWithoutKV == 0 {
872 1 : return false
873 1 : }
874 : // For size-class aware flushing we need to account for the metadata that is
875 : // allocated when this block is loaded into the block cache. For instance, if
876 : // a block has size 1020B it may fit within a 1024B class. However, when
877 : // loaded into the block cache we also allocate space for the cache entry
878 : // metadata. The new allocation of size ~1052B may now only fit within a
879 : // 2048B class, which increases internal fragmentation.
880 1 : sizeWithKV += cache.ValueMetadataSize
881 1 : sizeWithoutKV += cache.ValueMetadataSize
882 1 : if sizeWithKV < flushOptions.blockSize {
883 1 : // Even with the new KV we still haven't exceeded the target block size.
884 1 : // There's no harm to committing to flushing with the new KV (and
885 1 : // possibly additional future KVs).
886 1 : return false
887 1 : }
888 :
889 1 : sizeClassWithKV, withOk := blockSizeClass(sizeWithKV, sizeClassHints)
890 1 : sizeClassWithoutKV, withoutOk := blockSizeClass(sizeWithoutKV, sizeClassHints)
891 1 : if !withOk || !withoutOk {
892 1 : // If the block size could not be mapped to a size class, we fall back
893 1 : // to flushing without the KV since we already know sizeWithKV >=
894 1 : // blockSize.
895 1 : return true
896 1 : }
897 : // Even though sizeWithKV >= blockSize, we may still want to defer flushing
898 : // if the new size class results in less fragmentation than the block
899 : // without the KV that does fit within the block size.
900 0 : if sizeClassWithKV-sizeWithKV < sizeClassWithoutKV-sizeWithoutKV {
901 0 : return false
902 0 : }
903 0 : return true
904 : }
|