Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "math"
12 : "runtime"
13 : "slices"
14 : "sort"
15 : "sync"
16 :
17 : "github.com/cespare/xxhash/v2"
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/bytealloc"
21 : "github.com/cockroachdb/pebble/internal/cache"
22 : "github.com/cockroachdb/pebble/internal/crc"
23 : "github.com/cockroachdb/pebble/internal/invariants"
24 : "github.com/cockroachdb/pebble/internal/keyspan"
25 : "github.com/cockroachdb/pebble/internal/private"
26 : "github.com/cockroachdb/pebble/internal/rangekey"
27 : "github.com/cockroachdb/pebble/objstorage"
28 : )
29 :
30 : // encodedBHPEstimatedSize estimates the size of the encoded BlockHandleWithProperties.
31 : // It would also be nice to account for the length of the data block properties here,
32 : // but isn't necessary since this is an estimate.
33 : const encodedBHPEstimatedSize = binary.MaxVarintLen64 * 2
34 :
35 : var errWriterClosed = errors.New("pebble: writer is closed")
36 :
37 : // WriterMetadata holds info about a finished sstable.
38 : type WriterMetadata struct {
39 : Size uint64
40 : SmallestPoint InternalKey
41 : // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
42 : // before Writer.Close is called, because they may only be set on
43 : // Writer.Close.
44 : LargestPoint InternalKey
45 : SmallestRangeDel InternalKey
46 : LargestRangeDel InternalKey
47 : SmallestRangeKey InternalKey
48 : LargestRangeKey InternalKey
49 : HasPointKeys bool
50 : HasRangeDelKeys bool
51 : HasRangeKeys bool
52 : SmallestSeqNum uint64
53 : LargestSeqNum uint64
54 : Properties Properties
55 : }
56 :
57 : // SetSmallestPointKey sets the smallest point key to the given key.
58 : // NB: this method set the "absolute" smallest point key. Any existing key is
59 : // overridden.
60 2 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
61 2 : m.SmallestPoint = k
62 2 : m.HasPointKeys = true
63 2 : }
64 :
65 : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
66 : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
67 : // overridden.
68 2 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
69 2 : m.SmallestRangeDel = k
70 2 : m.HasRangeDelKeys = true
71 2 : }
72 :
73 : // SetSmallestRangeKey sets the smallest range key to the given key.
74 : // NB: this method set the "absolute" smallest range key. Any existing key is
75 : // overridden.
76 2 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
77 2 : m.SmallestRangeKey = k
78 2 : m.HasRangeKeys = true
79 2 : }
80 :
81 : // SetLargestPointKey sets the largest point key to the given key.
82 : // NB: this method set the "absolute" largest point key. Any existing key is
83 : // overridden.
84 2 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
85 2 : m.LargestPoint = k
86 2 : m.HasPointKeys = true
87 2 : }
88 :
89 : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
90 : // NB: this method set the "absolute" largest rangedel key. Any existing key is
91 : // overridden.
92 2 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
93 2 : m.LargestRangeDel = k
94 2 : m.HasRangeDelKeys = true
95 2 : }
96 :
97 : // SetLargestRangeKey sets the largest range key to the given key.
98 : // NB: this method set the "absolute" largest range key. Any existing key is
99 : // overridden.
100 2 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
101 2 : m.LargestRangeKey = k
102 2 : m.HasRangeKeys = true
103 2 : }
104 :
105 2 : func (m *WriterMetadata) updateSeqNum(seqNum uint64) {
106 2 : if m.SmallestSeqNum > seqNum {
107 2 : m.SmallestSeqNum = seqNum
108 2 : }
109 2 : if m.LargestSeqNum < seqNum {
110 2 : m.LargestSeqNum = seqNum
111 2 : }
112 : }
113 :
114 : // flushDecisionOptions holds parameters to inform the sstable block flushing
115 : // heuristics.
116 : type flushDecisionOptions struct {
117 : blockSize int
118 : blockSizeThreshold int
119 : // sizeClassAwareThreshold takes precedence over blockSizeThreshold when the
120 : // Writer is aware of the allocator's size classes.
121 : sizeClassAwareThreshold int
122 : }
123 :
124 : // Writer is a table writer.
125 : type Writer struct {
126 : writable objstorage.Writable
127 : meta WriterMetadata
128 : err error
129 : // cacheID and fileNum are used to remove blocks written to the sstable from
130 : // the cache, providing a defense in depth against bugs which cause cache
131 : // collisions.
132 : cacheID uint64
133 : fileNum base.DiskFileNum
134 : // dataBlockOptions and indexBlockOptions are used to configure the sstable
135 : // block flush heuristics.
136 : dataBlockOptions flushDecisionOptions
137 : indexBlockOptions flushDecisionOptions
138 : // The following fields are copied from Options.
139 : compare Compare
140 : split Split
141 : formatKey base.FormatKey
142 : compression Compression
143 : separator Separator
144 : successor Successor
145 : tableFormat TableFormat
146 : isStrictObsolete bool
147 : writingToLowestLevel bool
148 : cache *cache.Cache
149 : restartInterval int
150 : checksumType ChecksumType
151 : // disableKeyOrderChecks disables the checks that keys are added to an
152 : // sstable in order. It is intended for internal use only in the construction
153 : // of invalid sstables for testing. See tool/make_test_sstables.go.
154 : disableKeyOrderChecks bool
155 : // With two level indexes, the index/filter of a SST file is partitioned into
156 : // smaller blocks with an additional top-level index on them. When reading an
157 : // index/filter, only the top-level index is loaded into memory. The two level
158 : // index/filter then uses the top-level index to load on demand into the block
159 : // cache the partitions that are required to perform the index/filter query.
160 : //
161 : // Two level indexes are enabled automatically when there is more than one
162 : // index block.
163 : //
164 : // This is useful when there are very large index blocks, which generally occurs
165 : // with the usage of large keys. With large index blocks, the index blocks fight
166 : // the data blocks for block cache space and the index blocks are likely to be
167 : // re-read many times from the disk. The top level index, which has a much
168 : // smaller memory footprint, can be used to prevent the entire index block from
169 : // being loaded into the block cache.
170 : twoLevelIndex bool
171 : // Internal flag to allow creation of range-del-v1 format blocks. Only used
172 : // for testing. Note that v2 format blocks are backwards compatible with v1
173 : // format blocks.
174 : rangeDelV1Format bool
175 : indexBlock *indexBlockBuf
176 : rangeDelBlock blockWriter
177 : rangeKeyBlock blockWriter
178 : topLevelIndexBlock blockWriter
179 : props Properties
180 : blockPropCollectors []BlockPropertyCollector
181 : obsoleteCollector obsoleteKeyBlockPropertyCollector
182 : blockPropsEncoder blockPropertiesEncoder
183 : // filter accumulates the filter block. If populated, the filter ingests
184 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
185 : // nil, or the full keys otherwise.
186 : filter filterWriter
187 : indexPartitions []indexBlockAndBlockProperties
188 :
189 : // indexBlockAlloc is used to bulk-allocate byte slices used to store index
190 : // blocks in indexPartitions. These live until the index finishes.
191 : indexBlockAlloc []byte
192 : // indexSepAlloc is used to bulk-allocate index block separator slices stored
193 : // in indexPartitions. These live until the index finishes.
194 : indexSepAlloc bytealloc.A
195 :
196 : // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
197 : // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
198 : // and values, emitting fragmented, coalesced spans as appropriate. Range
199 : // keys must be added in order of their start user-key.
200 : fragmenter keyspan.Fragmenter
201 : rangeKeyEncoder rangekey.Encoder
202 : rangeKeysBySuffix keyspan.KeysBySuffix
203 : rangeKeySpan keyspan.Span
204 : rkBuf []byte
205 : // dataBlockBuf consists of the state which is currently owned by and used by
206 : // the Writer client goroutine. This state can be handed off to other goroutines.
207 : dataBlockBuf *dataBlockBuf
208 : // blockBuf consists of the state which is owned by and used by the Writer client
209 : // goroutine.
210 : blockBuf blockBuf
211 :
212 : coordination coordinationState
213 :
214 : // Information (other than the byte slice) about the last point key, to
215 : // avoid extracting it again.
216 : lastPointKeyInfo pointKeyInfo
217 :
218 : // For value blocks.
219 : shortAttributeExtractor base.ShortAttributeExtractor
220 : requiredInPlaceValueBound UserKeyPrefixBound
221 : // When w.tableFormat >= TableFormatPebblev3, valueBlockWriter is nil iff
222 : // WriterOptions.DisableValueBlocks was true.
223 : valueBlockWriter *valueBlockWriter
224 :
225 : allocatorSizeClasses []int
226 : }
227 :
228 : type pointKeyInfo struct {
229 : trailer uint64
230 : // Only computed when w.valueBlockWriter is not nil.
231 : userKeyLen int
232 : // prefixLen uses w.split, if not nil. Only computed when w.valueBlockWriter
233 : // is not nil.
234 : prefixLen int
235 : // True iff the point was marked obsolete.
236 : isObsolete bool
237 : }
238 :
239 : type coordinationState struct {
240 : parallelismEnabled bool
241 :
242 : // writeQueue is used to write data blocks to disk. The writeQueue is primarily
243 : // used to maintain the order in which data blocks must be written to disk. For
244 : // this reason, every single data block write must be done through the writeQueue.
245 : writeQueue *writeQueue
246 :
247 : sizeEstimate dataBlockEstimates
248 : }
249 :
250 2 : func (c *coordinationState) init(parallelismEnabled bool, writer *Writer) {
251 2 : c.parallelismEnabled = parallelismEnabled
252 2 : // useMutex is false regardless of parallelismEnabled, because we do not do
253 2 : // parallel compression yet.
254 2 : c.sizeEstimate.useMutex = false
255 2 :
256 2 : // writeQueueSize determines the size of the write queue, or the number
257 2 : // of items which can be added to the queue without blocking. By default, we
258 2 : // use a writeQueue size of 0, since we won't be doing any block writes in
259 2 : // parallel.
260 2 : writeQueueSize := 0
261 2 : if parallelismEnabled {
262 2 : writeQueueSize = runtime.GOMAXPROCS(0)
263 2 : }
264 2 : c.writeQueue = newWriteQueue(writeQueueSize, writer)
265 : }
266 :
267 : // sizeEstimate is a general purpose helper for estimating two kinds of sizes:
268 : // A. The compressed sstable size, which is useful for deciding when to start
269 : //
270 : // a new sstable during flushes or compactions. In practice, we use this in
271 : // estimating the data size (excluding the index).
272 : //
273 : // B. The size of index blocks to decide when to start a new index block.
274 : //
275 : // There are some terminology peculiarities which are due to the origin of
276 : // sizeEstimate for use case A with parallel compression enabled (for which
277 : // the code has not been merged). Specifically this relates to the terms
278 : // "written" and "compressed".
279 : // - The notion of "written" for case A is sufficiently defined by saying that
280 : // the data block is compressed. Waiting for the actual data block write to
281 : // happen can result in unnecessary estimation, when we already know how big
282 : // it will be in compressed form. Additionally, with the forthcoming value
283 : // blocks containing older MVCC values, these compressed block will be held
284 : // in-memory until late in the sstable writing, and we do want to accurately
285 : // account for them without waiting for the actual write.
286 : // For case B, "written" means that the index entry has been fully
287 : // generated, and has been added to the uncompressed block buffer for that
288 : // index block. It does not include actually writing a potentially
289 : // compressed index block.
290 : // - The notion of "compressed" is to differentiate between a "inflight" size
291 : // and the actual size, and is handled via computing a compression ratio
292 : // observed so far (defaults to 1).
293 : // For case A, this is actual data block compression, so the "inflight" size
294 : // is uncompressed blocks (that are no longer being written to) and the
295 : // "compressed" size is after they have been compressed.
296 : // For case B the inflight size is for a key-value pair in the index for
297 : // which the value size (the encoded size of the BlockHandleWithProperties)
298 : // is not accurately known, while the compressed size is the size of that
299 : // entry when it has been added to the (in-progress) index ssblock.
300 : //
301 : // Usage: To update state, one can optionally provide an inflight write value
302 : // using addInflight (used for case B). When something is "written" the state
303 : // can be updated using either writtenWithDelta or writtenWithTotal, which
304 : // provide the actual delta size or the total size (latter must be
305 : // monotonically non-decreasing). If there were no calls to addInflight, there
306 : // isn't any real estimation happening here. So case A does not do any real
307 : // estimation. However, when we introduce parallel compression, there will be
308 : // estimation in that the client goroutine will call addInFlight and the
309 : // compression goroutines will call writtenWithDelta.
310 : type sizeEstimate struct {
311 : // emptySize is the size when there is no inflight data, and numEntries is 0.
312 : // emptySize is constant once set.
313 : emptySize uint64
314 :
315 : // inflightSize is the estimated size of some inflight data which hasn't
316 : // been written yet.
317 : inflightSize uint64
318 :
319 : // totalSize is the total size of the data which has already been written.
320 : totalSize uint64
321 :
322 : // numWrittenEntries is the total number of entries which have already been
323 : // written.
324 : numWrittenEntries uint64
325 : // numInflightEntries is the total number of entries which are inflight, and
326 : // haven't been written.
327 : numInflightEntries uint64
328 :
329 : // maxEstimatedSize stores the maximum result returned from sizeEstimate.size.
330 : // It ensures that values returned from subsequent calls to Writer.EstimatedSize
331 : // never decrease.
332 : maxEstimatedSize uint64
333 :
334 : // We assume that the entries added to the sizeEstimate can be compressed.
335 : // For this reason, we keep track of a compressedSize and an uncompressedSize
336 : // to compute a compression ratio for the inflight entries. If the entries
337 : // aren't being compressed, then compressedSize and uncompressedSize must be
338 : // equal.
339 : compressedSize uint64
340 : uncompressedSize uint64
341 : }
342 :
343 2 : func (s *sizeEstimate) init(emptySize uint64) {
344 2 : s.emptySize = emptySize
345 2 : }
346 :
347 2 : func (s *sizeEstimate) size() uint64 {
348 2 : ratio := float64(1)
349 2 : if s.uncompressedSize > 0 {
350 2 : ratio = float64(s.compressedSize) / float64(s.uncompressedSize)
351 2 : }
352 2 : estimatedInflightSize := uint64(float64(s.inflightSize) * ratio)
353 2 : total := s.totalSize + estimatedInflightSize
354 2 : if total > s.maxEstimatedSize {
355 2 : s.maxEstimatedSize = total
356 2 : } else {
357 2 : total = s.maxEstimatedSize
358 2 : }
359 :
360 2 : if total == 0 {
361 2 : return s.emptySize
362 2 : }
363 :
364 2 : return total
365 : }
366 :
367 2 : func (s *sizeEstimate) numTotalEntries() uint64 {
368 2 : return s.numWrittenEntries + s.numInflightEntries
369 2 : }
370 :
371 2 : func (s *sizeEstimate) addInflight(size int) {
372 2 : s.numInflightEntries++
373 2 : s.inflightSize += uint64(size)
374 2 : }
375 :
376 2 : func (s *sizeEstimate) writtenWithTotal(newTotalSize uint64, inflightSize int) {
377 2 : finalEntrySize := int(newTotalSize - s.totalSize)
378 2 : s.writtenWithDelta(finalEntrySize, inflightSize)
379 2 : }
380 :
381 2 : func (s *sizeEstimate) writtenWithDelta(finalEntrySize int, inflightSize int) {
382 2 : if inflightSize > 0 {
383 2 : // This entry was previously inflight, so we should decrement inflight
384 2 : // entries and update the "compression" stats for future estimation.
385 2 : s.numInflightEntries--
386 2 : s.inflightSize -= uint64(inflightSize)
387 2 : s.uncompressedSize += uint64(inflightSize)
388 2 : s.compressedSize += uint64(finalEntrySize)
389 2 : }
390 2 : s.numWrittenEntries++
391 2 : s.totalSize += uint64(finalEntrySize)
392 : }
393 :
394 2 : func (s *sizeEstimate) clear() {
395 2 : *s = sizeEstimate{emptySize: s.emptySize}
396 2 : }
397 :
398 : type indexBlockBuf struct {
399 : // block will only be accessed from the writeQueue.
400 : block blockWriter
401 :
402 : size struct {
403 : useMutex bool
404 : mu sync.Mutex
405 : estimate sizeEstimate
406 : }
407 :
408 : // restartInterval matches indexBlockBuf.block.restartInterval. We store it twice, because the `block`
409 : // must only be accessed from the writeQueue goroutine.
410 : restartInterval int
411 : }
412 :
413 2 : func (i *indexBlockBuf) clear() {
414 2 : i.block.clear()
415 2 : if i.size.useMutex {
416 2 : i.size.mu.Lock()
417 2 : defer i.size.mu.Unlock()
418 2 : }
419 2 : i.size.estimate.clear()
420 2 : i.restartInterval = 0
421 : }
422 :
423 : var indexBlockBufPool = sync.Pool{
424 2 : New: func() interface{} {
425 2 : return &indexBlockBuf{}
426 2 : },
427 : }
428 :
429 : const indexBlockRestartInterval = 1
430 :
431 2 : func newIndexBlockBuf(useMutex bool) *indexBlockBuf {
432 2 : i := indexBlockBufPool.Get().(*indexBlockBuf)
433 2 : i.size.useMutex = useMutex
434 2 : i.restartInterval = indexBlockRestartInterval
435 2 : i.block.restartInterval = indexBlockRestartInterval
436 2 : i.size.estimate.init(emptyBlockSize)
437 2 : return i
438 2 : }
439 :
440 : func (i *indexBlockBuf) shouldFlush(
441 : sep InternalKey, valueLen int, flushOptions flushDecisionOptions, sizeClassHints []int,
442 2 : ) bool {
443 2 : if i.size.useMutex {
444 2 : i.size.mu.Lock()
445 2 : defer i.size.mu.Unlock()
446 2 : }
447 :
448 2 : nEntries := i.size.estimate.numTotalEntries()
449 2 : return shouldFlushWithHints(
450 2 : sep.Size(), valueLen, i.restartInterval, int(i.size.estimate.size()),
451 2 : int(nEntries), flushOptions, sizeClassHints)
452 : }
453 :
454 2 : func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) {
455 2 : i.block.add(key, value)
456 2 : size := i.block.estimatedSize()
457 2 : if i.size.useMutex {
458 2 : i.size.mu.Lock()
459 2 : defer i.size.mu.Unlock()
460 2 : }
461 2 : i.size.estimate.writtenWithTotal(uint64(size), inflightSize)
462 : }
463 :
464 2 : func (i *indexBlockBuf) finish() []byte {
465 2 : b := i.block.finish()
466 2 : return b
467 2 : }
468 :
469 2 : func (i *indexBlockBuf) addInflight(inflightSize int) {
470 2 : if i.size.useMutex {
471 2 : i.size.mu.Lock()
472 2 : defer i.size.mu.Unlock()
473 2 : }
474 2 : i.size.estimate.addInflight(inflightSize)
475 : }
476 :
477 2 : func (i *indexBlockBuf) estimatedSize() uint64 {
478 2 : if i.size.useMutex {
479 2 : i.size.mu.Lock()
480 2 : defer i.size.mu.Unlock()
481 2 : }
482 :
483 : // Make sure that the size estimation works as expected when parallelism
484 : // is disabled.
485 2 : if invariants.Enabled && !i.size.useMutex {
486 2 : if i.size.estimate.inflightSize != 0 {
487 0 : panic("unexpected inflight entry in index block size estimation")
488 : }
489 :
490 : // NB: The i.block should only be accessed from the writeQueue goroutine,
491 : // when parallelism is enabled. We break that invariant here, but that's
492 : // okay since parallelism is disabled.
493 2 : if i.size.estimate.size() != uint64(i.block.estimatedSize()) {
494 0 : panic("index block size estimation sans parallelism is incorrect")
495 : }
496 : }
497 2 : return i.size.estimate.size()
498 : }
499 :
500 : // sizeEstimate is used for sstable size estimation. sizeEstimate can be
501 : // accessed by the Writer client and compressionQueue goroutines. Fields
502 : // should only be read/updated through the functions defined on the
503 : // *sizeEstimate type.
504 : type dataBlockEstimates struct {
505 : // If we don't do block compression in parallel, then we don't need to take
506 : // the performance hit of synchronizing using this mutex.
507 : useMutex bool
508 : mu sync.Mutex
509 :
510 : estimate sizeEstimate
511 : }
512 :
513 : // inflightSize is the uncompressed block size estimate which has been
514 : // previously provided to addInflightDataBlock(). If addInflightDataBlock()
515 : // has not been called, this must be set to 0. compressedSize is the
516 : // compressed size of the block.
517 2 : func (d *dataBlockEstimates) dataBlockCompressed(compressedSize int, inflightSize int) {
518 2 : if d.useMutex {
519 0 : d.mu.Lock()
520 0 : defer d.mu.Unlock()
521 0 : }
522 2 : d.estimate.writtenWithDelta(compressedSize+blockTrailerLen, inflightSize)
523 : }
524 :
525 : // size is an estimated size of datablock data which has been written to disk.
526 2 : func (d *dataBlockEstimates) size() uint64 {
527 2 : if d.useMutex {
528 0 : d.mu.Lock()
529 0 : defer d.mu.Unlock()
530 0 : }
531 : // If there is no parallel compression, there should not be any inflight bytes.
532 2 : if invariants.Enabled && !d.useMutex {
533 2 : if d.estimate.inflightSize != 0 {
534 0 : panic("unexpected inflight entry in data block size estimation")
535 : }
536 : }
537 2 : return d.estimate.size()
538 : }
539 :
540 : // Avoid linter unused error.
541 : var _ = (&dataBlockEstimates{}).addInflightDataBlock
542 :
543 : // NB: unused since no parallel compression.
544 0 : func (d *dataBlockEstimates) addInflightDataBlock(size int) {
545 0 : if d.useMutex {
546 0 : d.mu.Lock()
547 0 : defer d.mu.Unlock()
548 0 : }
549 :
550 0 : d.estimate.addInflight(size)
551 : }
552 :
553 : var writeTaskPool = sync.Pool{
554 2 : New: func() interface{} {
555 2 : t := &writeTask{}
556 2 : t.compressionDone = make(chan bool, 1)
557 2 : return t
558 2 : },
559 : }
560 :
561 : type checksummer struct {
562 : checksumType ChecksumType
563 : xxHasher *xxhash.Digest
564 : }
565 :
566 2 : func (c *checksummer) checksum(block []byte, blockType []byte) (checksum uint32) {
567 2 : // Calculate the checksum.
568 2 : switch c.checksumType {
569 2 : case ChecksumTypeCRC32c:
570 2 : checksum = crc.New(block).Update(blockType).Value()
571 1 : case ChecksumTypeXXHash64:
572 1 : if c.xxHasher == nil {
573 1 : c.xxHasher = xxhash.New()
574 1 : } else {
575 1 : c.xxHasher.Reset()
576 1 : }
577 1 : c.xxHasher.Write(block)
578 1 : c.xxHasher.Write(blockType)
579 1 : checksum = uint32(c.xxHasher.Sum64())
580 0 : default:
581 0 : panic(errors.Newf("unsupported checksum type: %d", c.checksumType))
582 : }
583 2 : return checksum
584 : }
585 :
586 : type blockBuf struct {
587 : // tmp is a scratch buffer, large enough to hold either footerLen bytes,
588 : // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most
589 : // likely large enough for a block handle with properties.
590 : tmp [blockHandleLikelyMaxLen]byte
591 : // compressedBuf is the destination buffer for compression. It is re-used over the
592 : // lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block.
593 : compressedBuf []byte
594 : checksummer checksummer
595 : }
596 :
597 2 : func (b *blockBuf) clear() {
598 2 : // We can't assign b.compressedBuf[:0] to compressedBuf because snappy relies
599 2 : // on the length of the buffer, and not the capacity to determine if it needs
600 2 : // to make an allocation.
601 2 : *b = blockBuf{
602 2 : compressedBuf: b.compressedBuf, checksummer: b.checksummer,
603 2 : }
604 2 : }
605 :
606 : // A dataBlockBuf holds all the state required to compress and write a data block to disk.
607 : // A dataBlockBuf begins its lifecycle owned by the Writer client goroutine. The Writer
608 : // client goroutine adds keys to the sstable, writing directly into a dataBlockBuf's blockWriter
609 : // until the block is full. Once a dataBlockBuf's block is full, the dataBlockBuf may be passed
610 : // to other goroutines for compression and file I/O.
611 : type dataBlockBuf struct {
612 : blockBuf
613 : dataBlock blockWriter
614 :
615 : // uncompressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
616 : // next byte slice to be compressed. The uncompressed byte slice will be backed by the
617 : // dataBlock.buf.
618 : uncompressed []byte
619 : // compressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
620 : // compressed byte slice which must be written to disk. The compressed byte slice may be
621 : // backed by the dataBlock.buf, or the dataBlockBuf.compressedBuf, depending on whether
622 : // we use the result of the compression.
623 : compressed []byte
624 :
625 : // We're making calls to BlockPropertyCollectors from the Writer client goroutine. We need to
626 : // pass the encoded block properties over to the write queue. To prevent copies, and allocations,
627 : // we give each dataBlockBuf, a blockPropertiesEncoder.
628 : blockPropsEncoder blockPropertiesEncoder
629 : // dataBlockProps is set when Writer.finishDataBlockProps is called. The dataBlockProps slice is
630 : // a shallow copy of the internal buffer of the dataBlockBuf.blockPropsEncoder.
631 : dataBlockProps []byte
632 :
633 : // sepScratch is reusable scratch space for computing separator keys.
634 : sepScratch []byte
635 : }
636 :
637 2 : func (d *dataBlockBuf) clear() {
638 2 : d.blockBuf.clear()
639 2 : d.dataBlock.clear()
640 2 :
641 2 : d.uncompressed = nil
642 2 : d.compressed = nil
643 2 : d.dataBlockProps = nil
644 2 : d.sepScratch = d.sepScratch[:0]
645 2 : }
646 :
647 : var dataBlockBufPool = sync.Pool{
648 2 : New: func() interface{} {
649 2 : return &dataBlockBuf{}
650 2 : },
651 : }
652 :
653 2 : func newDataBlockBuf(restartInterval int, checksumType ChecksumType) *dataBlockBuf {
654 2 : d := dataBlockBufPool.Get().(*dataBlockBuf)
655 2 : d.dataBlock.restartInterval = restartInterval
656 2 : d.checksummer.checksumType = checksumType
657 2 : return d
658 2 : }
659 :
660 2 : func (d *dataBlockBuf) finish() {
661 2 : d.uncompressed = d.dataBlock.finish()
662 2 : }
663 :
664 2 : func (d *dataBlockBuf) compressAndChecksum(c Compression) {
665 2 : d.compressed = compressAndChecksum(d.uncompressed, c, &d.blockBuf)
666 2 : }
667 :
668 : func (d *dataBlockBuf) shouldFlush(
669 : key InternalKey, valueLen int, flushOptions flushDecisionOptions, sizeClassHints []int,
670 2 : ) bool {
671 2 : return shouldFlushWithHints(
672 2 : key.Size(), valueLen, d.dataBlock.restartInterval, d.dataBlock.estimatedSize(),
673 2 : d.dataBlock.nEntries, flushOptions, sizeClassHints)
674 2 : }
675 :
676 : type indexBlockAndBlockProperties struct {
677 : nEntries int
678 : // sep is the last key added to this block, for computing a separator later.
679 : sep InternalKey
680 : properties []byte
681 : // block is the encoded block produced by blockWriter.finish.
682 : block []byte
683 : }
684 :
685 : // Set sets the value for the given key. The sequence number is set to 0.
686 : // Intended for use to externally construct an sstable before ingestion into a
687 : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
688 : // order.
689 : //
690 : // TODO(peter): untested
691 2 : func (w *Writer) Set(key, value []byte) error {
692 2 : if w.err != nil {
693 0 : return w.err
694 0 : }
695 2 : if w.isStrictObsolete {
696 0 : return errors.Errorf("use AddWithForceObsolete")
697 0 : }
698 : // forceObsolete is false based on the assumption that no RANGEDELs in the
699 : // sstable delete the added points.
700 2 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
701 : }
702 :
703 : // Delete deletes the value for the given key. The sequence number is set to
704 : // 0. Intended for use to externally construct an sstable before ingestion into
705 : // a DB.
706 : //
707 : // TODO(peter): untested
708 1 : func (w *Writer) Delete(key []byte) error {
709 1 : if w.err != nil {
710 0 : return w.err
711 0 : }
712 1 : if w.isStrictObsolete {
713 0 : return errors.Errorf("use AddWithForceObsolete")
714 0 : }
715 : // forceObsolete is false based on the assumption that no RANGEDELs in the
716 : // sstable delete the added points.
717 1 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
718 : }
719 :
720 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
721 : // (inclusive on start, exclusive on end). The sequence number is set to
722 : // 0. Intended for use to externally construct an sstable before ingestion into
723 : // a DB.
724 : //
725 : // TODO(peter): untested
726 2 : func (w *Writer) DeleteRange(start, end []byte) error {
727 2 : if w.err != nil {
728 0 : return w.err
729 0 : }
730 2 : return w.addTombstone(base.MakeInternalKey(start, 0, InternalKeyKindRangeDelete), end)
731 : }
732 :
733 : // Merge adds an action to the DB that merges the value at key with the new
734 : // value. The details of the merge are dependent upon the configured merge
735 : // operator. The sequence number is set to 0. Intended for use to externally
736 : // construct an sstable before ingestion into a DB.
737 : //
738 : // TODO(peter): untested
739 0 : func (w *Writer) Merge(key, value []byte) error {
740 0 : if w.err != nil {
741 0 : return w.err
742 0 : }
743 0 : if w.isStrictObsolete {
744 0 : return errors.Errorf("use AddWithForceObsolete")
745 0 : }
746 : // forceObsolete is false based on the assumption that no RANGEDELs in the
747 : // sstable that delete the added points. If the user configured this writer
748 : // to be strict-obsolete, addPoint will reject the addition of this MERGE.
749 0 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
750 : }
751 :
752 : // Add adds a key/value pair to the table being written. For a given Writer,
753 : // the keys passed to Add must be in increasing order. The exception to this
754 : // rule is range deletion tombstones. Range deletion tombstones need to be
755 : // added ordered by their start key, but they can be added out of order from
756 : // point entries. Additionally, range deletion tombstones must be fragmented
757 : // (i.e. by keyspan.Fragmenter).
758 2 : func (w *Writer) Add(key InternalKey, value []byte) error {
759 2 : if w.isStrictObsolete {
760 0 : return errors.Errorf("use AddWithForceObsolete")
761 0 : }
762 2 : return w.AddWithForceObsolete(key, value, false)
763 : }
764 :
765 : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
766 : //
767 : // forceObsolete indicates whether the caller has determined that this key is
768 : // obsolete even though it may be the latest point key for this userkey. This
769 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
770 : // strict-obsolete sstables.
771 : //
772 : // Note that there are two properties, S1 and S2 (see comment in format.go)
773 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
774 : // responsibility of the caller. S1 is solely the responsibility of the
775 : // callee.
776 2 : func (w *Writer) AddWithForceObsolete(key InternalKey, value []byte, forceObsolete bool) error {
777 2 : if w.err != nil {
778 0 : return w.err
779 0 : }
780 :
781 2 : switch key.Kind() {
782 2 : case InternalKeyKindRangeDelete:
783 2 : return w.addTombstone(key, value)
784 : case base.InternalKeyKindRangeKeyDelete,
785 : base.InternalKeyKindRangeKeySet,
786 0 : base.InternalKeyKindRangeKeyUnset:
787 0 : w.err = errors.Errorf(
788 0 : "pebble: range keys must be added via one of the RangeKey* functions")
789 0 : return w.err
790 : }
791 2 : return w.addPoint(key, value, forceObsolete)
792 : }
793 :
794 2 : func (w *Writer) makeAddPointDecisionV2(key InternalKey) error {
795 2 : prevTrailer := w.lastPointKeyInfo.trailer
796 2 : w.lastPointKeyInfo.trailer = key.Trailer
797 2 : if w.dataBlockBuf.dataBlock.nEntries == 0 {
798 2 : return nil
799 2 : }
800 2 : if !w.disableKeyOrderChecks {
801 2 : prevPointUserKey := w.dataBlockBuf.dataBlock.getCurUserKey()
802 2 : cmpUser := w.compare(prevPointUserKey, key.UserKey)
803 2 : if cmpUser > 0 || (cmpUser == 0 && prevTrailer <= key.Trailer) {
804 1 : return errors.Errorf(
805 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
806 1 : InternalKey{UserKey: prevPointUserKey, Trailer: prevTrailer}.Pretty(w.formatKey),
807 1 : key.Pretty(w.formatKey))
808 1 : }
809 : }
810 2 : return nil
811 : }
812 :
813 : // REQUIRES: at least one point has been written to the Writer.
814 2 : func (w *Writer) getLastPointUserKey() []byte {
815 2 : if w.dataBlockBuf.dataBlock.nEntries == 0 {
816 0 : panic(errors.AssertionFailedf("no point keys added to writer"))
817 : }
818 2 : return w.dataBlockBuf.dataBlock.getCurUserKey()
819 : }
820 :
821 : // REQUIRES: w.tableFormat >= TableFormatPebblev3
822 : func (w *Writer) makeAddPointDecisionV3(
823 : key InternalKey, valueLen int,
824 2 : ) (setHasSamePrefix bool, writeToValueBlock bool, isObsolete bool, err error) {
825 2 : prevPointKeyInfo := w.lastPointKeyInfo
826 2 : w.lastPointKeyInfo.userKeyLen = len(key.UserKey)
827 2 : w.lastPointKeyInfo.prefixLen = w.split(key.UserKey)
828 2 : w.lastPointKeyInfo.trailer = key.Trailer
829 2 : w.lastPointKeyInfo.isObsolete = false
830 2 : if !w.meta.HasPointKeys {
831 2 : return false, false, false, nil
832 2 : }
833 2 : keyKind := base.TrailerKind(key.Trailer)
834 2 : prevPointUserKey := w.getLastPointUserKey()
835 2 : prevPointKey := InternalKey{UserKey: prevPointUserKey, Trailer: prevPointKeyInfo.trailer}
836 2 : prevKeyKind := base.TrailerKind(prevPointKeyInfo.trailer)
837 2 : considerWriteToValueBlock := prevKeyKind == InternalKeyKindSet &&
838 2 : keyKind == InternalKeyKindSet
839 2 : if considerWriteToValueBlock && !w.requiredInPlaceValueBound.IsEmpty() {
840 1 : keyPrefix := key.UserKey[:w.lastPointKeyInfo.prefixLen]
841 1 : cmpUpper := w.compare(
842 1 : w.requiredInPlaceValueBound.Upper, keyPrefix)
843 1 : if cmpUpper <= 0 {
844 1 : // Common case for CockroachDB. Make it empty since all future keys in
845 1 : // this sstable will also have cmpUpper <= 0.
846 1 : w.requiredInPlaceValueBound = UserKeyPrefixBound{}
847 1 : } else if w.compare(keyPrefix, w.requiredInPlaceValueBound.Lower) >= 0 {
848 1 : considerWriteToValueBlock = false
849 1 : }
850 : }
851 : // cmpPrefix is initialized iff considerWriteToValueBlock.
852 2 : var cmpPrefix int
853 2 : var cmpUser int
854 2 : if considerWriteToValueBlock {
855 2 : // Compare the prefixes.
856 2 : cmpPrefix = w.compare(prevPointUserKey[:prevPointKeyInfo.prefixLen],
857 2 : key.UserKey[:w.lastPointKeyInfo.prefixLen])
858 2 : cmpUser = cmpPrefix
859 2 : if cmpPrefix == 0 {
860 2 : // Need to compare suffixes to compute cmpUser.
861 2 : cmpUser = w.compare(prevPointUserKey[prevPointKeyInfo.prefixLen:],
862 2 : key.UserKey[w.lastPointKeyInfo.prefixLen:])
863 2 : }
864 2 : } else {
865 2 : cmpUser = w.compare(prevPointUserKey, key.UserKey)
866 2 : }
867 : // Ensure that no one adds a point key kind without considering the obsolete
868 : // handling for that kind.
869 2 : switch keyKind {
870 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
871 2 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
872 0 : default:
873 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
874 : }
875 : // If same user key, then the current key is obsolete if any of the
876 : // following is true:
877 : // C1 The prev key was obsolete.
878 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
879 : // preserve SET* and MERGE since their values will be merged into the
880 : // previous key. We also must preserve DEL* since there may be an older
881 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
882 : // if we omit the DEL* that lower SET*/MERGE will become visible.
883 : //
884 : // Regardless of whether it is the same user key or not
885 : // C3 The current key is some kind of point delete, and we are writing to
886 : // the lowest level, then it is also obsolete. The correctness of this
887 : // relies on the same user key not spanning multiple sstables in a level.
888 : //
889 : // C1 ensures that for a user key there is at most one transition from
890 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
891 : // are not obsolete. We consider the various value of n:
892 : //
893 : // n = 0: This happens due to forceObsolete being set by the caller, or due
894 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
895 : // must also delete all the lower seqnums for the same user key. C3 triggers
896 : // due to a point delete and that deletes all the lower seqnums for the same
897 : // user key.
898 : //
899 : // n = 1: This is the common case. It happens when the first key is not a
900 : // MERGE, or the current key is some kind of point delete.
901 : //
902 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
903 : // single non-MERGE key.
904 2 : isObsoleteC1AndC2 := cmpUser == 0 &&
905 2 : (prevPointKeyInfo.isObsolete || prevKeyKind != InternalKeyKindMerge)
906 2 : isObsoleteC3 := w.writingToLowestLevel &&
907 2 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
908 2 : keyKind == InternalKeyKindDeleteSized)
909 2 : isObsolete = isObsoleteC1AndC2 || isObsoleteC3
910 2 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
911 2 : // possible, but requires some care in documenting and checking invariants.
912 2 : // There is code that assumes nothing in value blocks because of single MVCC
913 2 : // version (those should be ok). We have to ensure setHasSamePrefix is
914 2 : // correctly initialized here etc.
915 2 :
916 2 : if !w.disableKeyOrderChecks &&
917 2 : (cmpUser > 0 || (cmpUser == 0 && prevPointKeyInfo.trailer <= key.Trailer)) {
918 1 : return false, false, false, errors.Errorf(
919 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
920 1 : prevPointKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
921 1 : }
922 2 : if !considerWriteToValueBlock {
923 2 : return false, false, isObsolete, nil
924 2 : }
925 : // NB: it is possible that cmpUser == 0, i.e., these two SETs have identical
926 : // user keys (because of an open snapshot). This should be the rare case.
927 2 : setHasSamePrefix = cmpPrefix == 0
928 2 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
929 2 : // valueHandle, this should be > 3. But tiny values are common in test and
930 2 : // unlikely in production, so we use 0 here for better test coverage.
931 2 : const tinyValueThreshold = 0
932 2 : // NB: setting WriterOptions.DisableValueBlocks does not disable the
933 2 : // setHasSamePrefix optimization.
934 2 : considerWriteToValueBlock = setHasSamePrefix && valueLen > tinyValueThreshold && w.valueBlockWriter != nil
935 2 : return setHasSamePrefix, considerWriteToValueBlock, isObsolete, nil
936 : }
937 :
938 2 : func (w *Writer) addPoint(key InternalKey, value []byte, forceObsolete bool) error {
939 2 : if w.isStrictObsolete && key.Kind() == InternalKeyKindMerge {
940 1 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
941 1 : }
942 2 : var err error
943 2 : var setHasSameKeyPrefix, writeToValueBlock, addPrefixToValueStoredWithKey bool
944 2 : var isObsolete bool
945 2 : maxSharedKeyLen := len(key.UserKey)
946 2 : if w.tableFormat >= TableFormatPebblev3 {
947 2 : // maxSharedKeyLen is limited to the prefix of the preceding key. If the
948 2 : // preceding key was in a different block, then the blockWriter will
949 2 : // ignore this maxSharedKeyLen.
950 2 : maxSharedKeyLen = w.lastPointKeyInfo.prefixLen
951 2 : setHasSameKeyPrefix, writeToValueBlock, isObsolete, err =
952 2 : w.makeAddPointDecisionV3(key, len(value))
953 2 : addPrefixToValueStoredWithKey = base.TrailerKind(key.Trailer) == InternalKeyKindSet
954 2 : } else {
955 2 : err = w.makeAddPointDecisionV2(key)
956 2 : }
957 2 : if err != nil {
958 1 : return err
959 1 : }
960 2 : isObsolete = w.tableFormat >= TableFormatPebblev4 && (isObsolete || forceObsolete)
961 2 : w.lastPointKeyInfo.isObsolete = isObsolete
962 2 : var valueStoredWithKey []byte
963 2 : var prefix valuePrefix
964 2 : var valueStoredWithKeyLen int
965 2 : if writeToValueBlock {
966 2 : vh, err := w.valueBlockWriter.addValue(value)
967 2 : if err != nil {
968 0 : return err
969 0 : }
970 2 : n := encodeValueHandle(w.blockBuf.tmp[:], vh)
971 2 : valueStoredWithKey = w.blockBuf.tmp[:n]
972 2 : valueStoredWithKeyLen = len(valueStoredWithKey) + 1
973 2 : var attribute base.ShortAttribute
974 2 : if w.shortAttributeExtractor != nil {
975 1 : // TODO(sumeer): for compactions, it is possible that the input sstable
976 1 : // already has this value in the value section and so we have already
977 1 : // extracted the ShortAttribute. Avoid extracting it again. This will
978 1 : // require changing the Writer.Add interface.
979 1 : if attribute, err = w.shortAttributeExtractor(
980 1 : key.UserKey, w.lastPointKeyInfo.prefixLen, value); err != nil {
981 0 : return err
982 0 : }
983 : }
984 2 : prefix = makePrefixForValueHandle(setHasSameKeyPrefix, attribute)
985 2 : } else {
986 2 : valueStoredWithKey = value
987 2 : valueStoredWithKeyLen = len(value)
988 2 : if addPrefixToValueStoredWithKey {
989 2 : valueStoredWithKeyLen++
990 2 : }
991 2 : prefix = makePrefixForInPlaceValue(setHasSameKeyPrefix)
992 : }
993 :
994 2 : if err := w.maybeFlush(key, valueStoredWithKeyLen); err != nil {
995 1 : return err
996 1 : }
997 :
998 2 : for i := range w.blockPropCollectors {
999 2 : v := value
1000 2 : if addPrefixToValueStoredWithKey {
1001 2 : // Values for SET are not required to be in-place, and in the future may
1002 2 : // not even be read by the compaction, so pass nil values. Block
1003 2 : // property collectors in such Pebble DB's must not look at the value.
1004 2 : v = nil
1005 2 : }
1006 2 : if err := w.blockPropCollectors[i].Add(key, v); err != nil {
1007 1 : w.err = err
1008 1 : return err
1009 1 : }
1010 : }
1011 2 : if w.tableFormat >= TableFormatPebblev4 {
1012 2 : w.obsoleteCollector.AddPoint(isObsolete)
1013 2 : }
1014 :
1015 2 : w.maybeAddToFilter(key.UserKey)
1016 2 : w.dataBlockBuf.dataBlock.addWithOptionalValuePrefix(
1017 2 : key, isObsolete, valueStoredWithKey, maxSharedKeyLen, addPrefixToValueStoredWithKey, prefix,
1018 2 : setHasSameKeyPrefix)
1019 2 :
1020 2 : w.meta.updateSeqNum(key.SeqNum())
1021 2 :
1022 2 : if !w.meta.HasPointKeys {
1023 2 : k := w.dataBlockBuf.dataBlock.getCurKey()
1024 2 : // NB: We need to ensure that SmallestPoint.UserKey is set, so we create
1025 2 : // an InternalKey which is semantically identical to the key, but won't
1026 2 : // have a nil UserKey. We do this, because key.UserKey could be nil, and
1027 2 : // we don't want SmallestPoint.UserKey to be nil.
1028 2 : //
1029 2 : // todo(bananabrick): Determine if it's okay to have a nil SmallestPoint
1030 2 : // .UserKey now that we don't rely on a nil UserKey to determine if the
1031 2 : // key has been set or not.
1032 2 : w.meta.SetSmallestPointKey(k.Clone())
1033 2 : }
1034 :
1035 2 : w.props.NumEntries++
1036 2 : switch key.Kind() {
1037 2 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
1038 2 : w.props.NumDeletions++
1039 2 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
1040 2 : case InternalKeyKindDeleteSized:
1041 2 : var size uint64
1042 2 : if len(value) > 0 {
1043 2 : var n int
1044 2 : size, n = binary.Uvarint(value)
1045 2 : if n <= 0 {
1046 0 : w.err = errors.Newf("%s key's value (%x) does not parse as uvarint",
1047 0 : errors.Safe(key.Kind().String()), value)
1048 0 : return w.err
1049 0 : }
1050 : }
1051 2 : w.props.NumDeletions++
1052 2 : w.props.NumSizedDeletions++
1053 2 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
1054 2 : w.props.RawPointTombstoneValueSize += size
1055 2 : case InternalKeyKindMerge:
1056 2 : w.props.NumMergeOperands++
1057 : }
1058 2 : w.props.RawKeySize += uint64(key.Size())
1059 2 : w.props.RawValueSize += uint64(len(value))
1060 2 : return nil
1061 : }
1062 :
1063 1 : func (w *Writer) prettyTombstone(k InternalKey, value []byte) fmt.Formatter {
1064 1 : return keyspan.Span{
1065 1 : Start: k.UserKey,
1066 1 : End: value,
1067 1 : Keys: []keyspan.Key{{Trailer: k.Trailer}},
1068 1 : }.Pretty(w.formatKey)
1069 1 : }
1070 :
1071 2 : func (w *Writer) addTombstone(key InternalKey, value []byte) error {
1072 2 : if !w.disableKeyOrderChecks && !w.rangeDelV1Format && w.rangeDelBlock.nEntries > 0 {
1073 2 : // Check that tombstones are being added in fragmented order. If the two
1074 2 : // tombstones overlap, their start and end keys must be identical.
1075 2 : prevKey := w.rangeDelBlock.getCurKey()
1076 2 : switch c := w.compare(prevKey.UserKey, key.UserKey); {
1077 0 : case c > 0:
1078 0 : w.err = errors.Errorf("pebble: keys must be added in order: %s, %s",
1079 0 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1080 0 : return w.err
1081 2 : case c == 0:
1082 2 : prevValue := w.rangeDelBlock.curValue
1083 2 : if w.compare(prevValue, value) != 0 {
1084 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
1085 1 : w.prettyTombstone(prevKey, prevValue),
1086 1 : w.prettyTombstone(key, value))
1087 1 : return w.err
1088 1 : }
1089 2 : if prevKey.SeqNum() <= key.SeqNum() {
1090 1 : w.err = errors.Errorf("pebble: keys must be added in strictly increasing order: %s, %s",
1091 1 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1092 1 : return w.err
1093 1 : }
1094 2 : default:
1095 2 : prevValue := w.rangeDelBlock.curValue
1096 2 : if w.compare(prevValue, key.UserKey) > 0 {
1097 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
1098 1 : w.prettyTombstone(prevKey, prevValue),
1099 1 : w.prettyTombstone(key, value))
1100 1 : return w.err
1101 1 : }
1102 : }
1103 : }
1104 :
1105 2 : if key.Trailer == InternalKeyRangeDeleteSentinel {
1106 0 : w.err = errors.Errorf("pebble: cannot add range delete sentinel: %s", key.Pretty(w.formatKey))
1107 0 : return w.err
1108 0 : }
1109 :
1110 2 : w.meta.updateSeqNum(key.SeqNum())
1111 2 :
1112 2 : switch {
1113 1 : case w.rangeDelV1Format:
1114 1 : // Range tombstones are not fragmented in the v1 (i.e. RocksDB) range
1115 1 : // deletion block format, so we need to track the largest range tombstone
1116 1 : // end key as every range tombstone is added.
1117 1 : //
1118 1 : // Note that writing the v1 format is only supported for tests.
1119 1 : if w.props.NumRangeDeletions == 0 {
1120 1 : w.meta.SetSmallestRangeDelKey(key.Clone())
1121 1 : w.meta.SetLargestRangeDelKey(base.MakeRangeDeleteSentinelKey(value).Clone())
1122 1 : } else {
1123 1 : if base.InternalCompare(w.compare, w.meta.SmallestRangeDel, key) > 0 {
1124 1 : w.meta.SetSmallestRangeDelKey(key.Clone())
1125 1 : }
1126 1 : end := base.MakeRangeDeleteSentinelKey(value)
1127 1 : if base.InternalCompare(w.compare, w.meta.LargestRangeDel, end) < 0 {
1128 1 : w.meta.SetLargestRangeDelKey(end.Clone())
1129 1 : }
1130 : }
1131 :
1132 2 : default:
1133 2 : // Range tombstones are fragmented in the v2 range deletion block format,
1134 2 : // so the start key of the first range tombstone added will be the smallest
1135 2 : // range tombstone key. The largest range tombstone key will be determined
1136 2 : // in Writer.Close() as the end key of the last range tombstone added.
1137 2 : if w.props.NumRangeDeletions == 0 {
1138 2 : w.meta.SetSmallestRangeDelKey(key.Clone())
1139 2 : }
1140 : }
1141 :
1142 2 : w.props.NumEntries++
1143 2 : w.props.NumDeletions++
1144 2 : w.props.NumRangeDeletions++
1145 2 : w.props.RawKeySize += uint64(key.Size())
1146 2 : w.props.RawValueSize += uint64(len(value))
1147 2 : w.rangeDelBlock.add(key, value)
1148 2 : return nil
1149 : }
1150 :
1151 : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
1152 : // the given suffix to the given value. The resulting range key is given the
1153 : // sequence number zero, with the expectation that the resulting sstable will be
1154 : // ingested.
1155 : //
1156 : // Keys must be added to the table in increasing order of start key. Spans are
1157 : // not required to be fragmented. The same suffix may not be set or unset twice
1158 : // over the same keyspan, because it would result in inconsistent state. Both
1159 : // the Set and Unset would share the zero sequence number, and a key cannot be
1160 : // both simultaneously set and unset.
1161 1 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
1162 1 : return w.addRangeKeySpan(keyspan.Span{
1163 1 : Start: w.tempRangeKeyCopy(start),
1164 1 : End: w.tempRangeKeyCopy(end),
1165 1 : Keys: []keyspan.Key{
1166 1 : {
1167 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
1168 1 : Suffix: w.tempRangeKeyCopy(suffix),
1169 1 : Value: w.tempRangeKeyCopy(value),
1170 1 : },
1171 1 : },
1172 1 : })
1173 1 : }
1174 :
1175 : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
1176 : // with the given suffix. The resulting range key is given the
1177 : // sequence number zero, with the expectation that the resulting sstable will be
1178 : // ingested.
1179 : //
1180 : // Keys must be added to the table in increasing order of start key. Spans are
1181 : // not required to be fragmented. The same suffix may not be set or unset twice
1182 : // over the same keyspan, because it would result in inconsistent state. Both
1183 : // the Set and Unset would share the zero sequence number, and a key cannot be
1184 : // both simultaneously set and unset.
1185 1 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
1186 1 : return w.addRangeKeySpan(keyspan.Span{
1187 1 : Start: w.tempRangeKeyCopy(start),
1188 1 : End: w.tempRangeKeyCopy(end),
1189 1 : Keys: []keyspan.Key{
1190 1 : {
1191 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
1192 1 : Suffix: w.tempRangeKeyCopy(suffix),
1193 1 : },
1194 1 : },
1195 1 : })
1196 1 : }
1197 :
1198 : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
1199 : //
1200 : // Keys must be added to the table in increasing order of start key. Spans are
1201 : // not required to be fragmented.
1202 1 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
1203 1 : return w.addRangeKeySpan(keyspan.Span{
1204 1 : Start: w.tempRangeKeyCopy(start),
1205 1 : End: w.tempRangeKeyCopy(end),
1206 1 : Keys: []keyspan.Key{
1207 1 : {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
1208 1 : },
1209 1 : })
1210 1 : }
1211 :
1212 : // AddRangeKey adds a range key set, unset, or delete key/value pair to the
1213 : // table being written.
1214 : //
1215 : // Range keys must be supplied in strictly ascending order of start key (i.e.
1216 : // user key ascending, sequence number descending, and key type descending).
1217 : // Ranges added must also be supplied in fragmented span order - i.e. other than
1218 : // spans that are perfectly aligned (same start and end keys), spans may not
1219 : // overlap. Range keys may be added out of order relative to point keys and
1220 : // range deletions.
1221 2 : func (w *Writer) AddRangeKey(key InternalKey, value []byte) error {
1222 2 : if w.err != nil {
1223 0 : return w.err
1224 0 : }
1225 2 : return w.addRangeKey(key, value)
1226 : }
1227 :
1228 1 : func (w *Writer) addRangeKeySpan(span keyspan.Span) error {
1229 1 : if w.compare(span.Start, span.End) >= 0 {
1230 0 : return errors.Errorf(
1231 0 : "pebble: start key must be strictly less than end key",
1232 0 : )
1233 0 : }
1234 1 : if w.fragmenter.Start() != nil && w.compare(w.fragmenter.Start(), span.Start) > 0 {
1235 1 : return errors.Errorf("pebble: spans must be added in order: %s > %s",
1236 1 : w.formatKey(w.fragmenter.Start()), w.formatKey(span.Start))
1237 1 : }
1238 : // Add this span to the fragmenter.
1239 1 : w.fragmenter.Add(span)
1240 1 : return w.err
1241 : }
1242 :
1243 1 : func (w *Writer) encodeRangeKeySpan(span keyspan.Span) {
1244 1 : // This method is the emit function of the Fragmenter.
1245 1 : //
1246 1 : // NB: The span should only contain range keys and be internally consistent
1247 1 : // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
1248 1 : //
1249 1 : // We use w.rangeKeysBySuffix and w.rangeKeySpan to avoid allocations.
1250 1 :
1251 1 : // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
1252 1 : // we may want to in the future.
1253 1 : w.rangeKeysBySuffix.Cmp = w.compare
1254 1 : w.rangeKeysBySuffix.Keys = span.Keys
1255 1 : sort.Sort(&w.rangeKeysBySuffix)
1256 1 :
1257 1 : w.rangeKeySpan = span
1258 1 : w.rangeKeySpan.Keys = w.rangeKeysBySuffix.Keys
1259 1 : w.err = firstError(w.err, w.rangeKeyEncoder.Encode(&w.rangeKeySpan))
1260 1 : }
1261 :
1262 2 : func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
1263 2 : if !w.disableKeyOrderChecks && w.rangeKeyBlock.nEntries > 0 {
1264 2 : prevStartKey := w.rangeKeyBlock.getCurKey()
1265 2 : prevEndKey, _, ok := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue)
1266 2 : if !ok {
1267 0 : // We panic here as we should have previously decoded and validated this
1268 0 : // key and value when it was first added to the range key block.
1269 0 : panic(errors.Errorf("pebble: invalid end key for span: %s",
1270 0 : prevStartKey.Pretty(w.formatKey)))
1271 : }
1272 :
1273 2 : curStartKey := key
1274 2 : curEndKey, _, ok := rangekey.DecodeEndKey(curStartKey.Kind(), value)
1275 2 : if !ok {
1276 0 : w.err = errors.Errorf("pebble: invalid end key for span: %s",
1277 0 : curStartKey.Pretty(w.formatKey))
1278 0 : return w.err
1279 0 : }
1280 :
1281 : // Start keys must be strictly increasing.
1282 2 : if base.InternalCompare(w.compare, prevStartKey, curStartKey) >= 0 {
1283 1 : w.err = errors.Errorf(
1284 1 : "pebble: range keys starts must be added in increasing order: %s, %s",
1285 1 : prevStartKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1286 1 : return w.err
1287 1 : }
1288 :
1289 : // Start keys are increasing. If the start user keys are equal, the
1290 : // end keys must be equal (i.e. aligned spans).
1291 2 : if w.compare(prevStartKey.UserKey, curStartKey.UserKey) == 0 {
1292 2 : if w.compare(prevEndKey, curEndKey) != 0 {
1293 1 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
1294 1 : prevStartKey.Pretty(w.formatKey),
1295 1 : curStartKey.Pretty(w.formatKey))
1296 1 : return w.err
1297 1 : }
1298 2 : } else if w.compare(prevEndKey, curStartKey.UserKey) > 0 {
1299 1 : // If the start user keys are NOT equal, the spans must be disjoint (i.e.
1300 1 : // no overlap).
1301 1 : // NOTE: the inequality excludes zero, as we allow the end key of the
1302 1 : // lower span be the same as the start key of the upper span, because
1303 1 : // the range end key is considered an exclusive bound.
1304 1 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
1305 1 : prevStartKey.Pretty(w.formatKey),
1306 1 : curStartKey.Pretty(w.formatKey))
1307 1 : return w.err
1308 1 : }
1309 : }
1310 :
1311 : // TODO(travers): Add an invariant-gated check to ensure that suffix-values
1312 : // are sorted within coalesced spans.
1313 :
1314 : // Range-keys and point-keys are intended to live in "parallel" keyspaces.
1315 : // However, we track a single seqnum in the table metadata that spans both of
1316 : // these keyspaces.
1317 : // TODO(travers): Consider tracking range key seqnums separately.
1318 2 : w.meta.updateSeqNum(key.SeqNum())
1319 2 :
1320 2 : // Range tombstones are fragmented, so the start key of the first range key
1321 2 : // added will be the smallest. The largest range key is determined in
1322 2 : // Writer.Close() as the end key of the last range key added to the block.
1323 2 : if w.props.NumRangeKeys() == 0 {
1324 2 : w.meta.SetSmallestRangeKey(key.Clone())
1325 2 : }
1326 :
1327 : // Update block properties.
1328 2 : w.props.RawRangeKeyKeySize += uint64(key.Size())
1329 2 : w.props.RawRangeKeyValueSize += uint64(len(value))
1330 2 : switch key.Kind() {
1331 2 : case base.InternalKeyKindRangeKeyDelete:
1332 2 : w.props.NumRangeKeyDels++
1333 2 : case base.InternalKeyKindRangeKeySet:
1334 2 : w.props.NumRangeKeySets++
1335 2 : case base.InternalKeyKindRangeKeyUnset:
1336 2 : w.props.NumRangeKeyUnsets++
1337 0 : default:
1338 0 : panic(errors.Errorf("pebble: invalid range key type: %s", key.Kind()))
1339 : }
1340 :
1341 2 : for i := range w.blockPropCollectors {
1342 2 : if err := w.blockPropCollectors[i].Add(key, value); err != nil {
1343 0 : return err
1344 0 : }
1345 : }
1346 :
1347 : // Add the key to the block.
1348 2 : w.rangeKeyBlock.add(key, value)
1349 2 : return nil
1350 : }
1351 :
1352 : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
1353 : // slice. Any byte written to the returned slice is retained for the lifetime of
1354 : // the Writer.
1355 1 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
1356 1 : if cap(w.rkBuf)-len(w.rkBuf) < n {
1357 1 : size := len(w.rkBuf) + 2*n
1358 1 : if size < 2*cap(w.rkBuf) {
1359 1 : size = 2 * cap(w.rkBuf)
1360 1 : }
1361 1 : buf := make([]byte, len(w.rkBuf), size)
1362 1 : copy(buf, w.rkBuf)
1363 1 : w.rkBuf = buf
1364 : }
1365 1 : b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
1366 1 : w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
1367 1 : return b
1368 : }
1369 :
1370 : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
1371 : // range key buffer.
1372 1 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
1373 1 : if len(k) == 0 {
1374 1 : return nil
1375 1 : }
1376 1 : buf := w.tempRangeKeyBuf(len(k))
1377 1 : copy(buf, k)
1378 1 : return buf
1379 : }
1380 :
1381 2 : func (w *Writer) maybeAddToFilter(key []byte) {
1382 2 : if w.filter != nil {
1383 2 : prefix := key[:w.split(key)]
1384 2 : w.filter.addKey(prefix)
1385 2 : }
1386 : }
1387 :
1388 2 : func (w *Writer) flush(key InternalKey) error {
1389 2 : // We're finishing a data block.
1390 2 : err := w.finishDataBlockProps(w.dataBlockBuf)
1391 2 : if err != nil {
1392 1 : return err
1393 1 : }
1394 2 : w.dataBlockBuf.finish()
1395 2 : w.dataBlockBuf.compressAndChecksum(w.compression)
1396 2 : // Since dataBlockEstimates.addInflightDataBlock was never called, the
1397 2 : // inflightSize is set to 0.
1398 2 : w.coordination.sizeEstimate.dataBlockCompressed(len(w.dataBlockBuf.compressed), 0)
1399 2 :
1400 2 : // Determine if the index block should be flushed. Since we're accessing the
1401 2 : // dataBlockBuf.dataBlock.curKey here, we have to make sure that once we start
1402 2 : // to pool the dataBlockBufs, the curKey isn't used by the Writer once the
1403 2 : // dataBlockBuf is added back to a sync.Pool. In this particular case, the
1404 2 : // byte slice which supports "sep" will eventually be copied when "sep" is
1405 2 : // added to the index block.
1406 2 : prevKey := w.dataBlockBuf.dataBlock.getCurKey()
1407 2 : sep := w.indexEntrySep(prevKey, key, w.dataBlockBuf)
1408 2 : // We determine that we should flush an index block from the Writer client
1409 2 : // goroutine, but we actually finish the index block from the writeQueue.
1410 2 : // When we determine that an index block should be flushed, we need to call
1411 2 : // BlockPropertyCollector.FinishIndexBlock. But block property collector
1412 2 : // calls must happen sequentially from the Writer client. Therefore, we need
1413 2 : // to determine that we are going to flush the index block from the Writer
1414 2 : // client.
1415 2 : shouldFlushIndexBlock := supportsTwoLevelIndex(w.tableFormat) && w.indexBlock.shouldFlush(
1416 2 : sep, encodedBHPEstimatedSize, w.indexBlockOptions, w.allocatorSizeClasses,
1417 2 : )
1418 2 :
1419 2 : var indexProps []byte
1420 2 : var flushableIndexBlock *indexBlockBuf
1421 2 : if shouldFlushIndexBlock {
1422 2 : flushableIndexBlock = w.indexBlock
1423 2 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1424 2 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1425 2 : // flush the index block.
1426 2 : indexProps, err = w.finishIndexBlockProps()
1427 2 : if err != nil {
1428 1 : return err
1429 1 : }
1430 : }
1431 :
1432 : // We've called BlockPropertyCollector.FinishDataBlock, and, if necessary,
1433 : // BlockPropertyCollector.FinishIndexBlock. Since we've decided to finish
1434 : // the data block, we can call
1435 : // BlockPropertyCollector.AddPrevDataBlockToIndexBlock.
1436 2 : w.addPrevDataBlockToIndexBlockProps()
1437 2 :
1438 2 : // Schedule a write.
1439 2 : writeTask := writeTaskPool.Get().(*writeTask)
1440 2 : // We're setting compressionDone to indicate that compression of this block
1441 2 : // has already been completed.
1442 2 : writeTask.compressionDone <- true
1443 2 : writeTask.buf = w.dataBlockBuf
1444 2 : writeTask.indexEntrySep = sep
1445 2 : writeTask.currIndexBlock = w.indexBlock
1446 2 : writeTask.indexInflightSize = sep.Size() + encodedBHPEstimatedSize
1447 2 : writeTask.finishedIndexProps = indexProps
1448 2 : writeTask.flushableIndexBlock = flushableIndexBlock
1449 2 :
1450 2 : // The writeTask corresponds to an unwritten index entry.
1451 2 : w.indexBlock.addInflight(writeTask.indexInflightSize)
1452 2 :
1453 2 : w.dataBlockBuf = nil
1454 2 : if w.coordination.parallelismEnabled {
1455 2 : w.coordination.writeQueue.add(writeTask)
1456 2 : } else {
1457 2 : err = w.coordination.writeQueue.addSync(writeTask)
1458 2 : }
1459 2 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
1460 2 :
1461 2 : return err
1462 : }
1463 :
1464 2 : func (w *Writer) maybeFlush(key InternalKey, valueLen int) error {
1465 2 : if !w.dataBlockBuf.shouldFlush(key, valueLen, w.dataBlockOptions, w.allocatorSizeClasses) {
1466 2 : return nil
1467 2 : }
1468 :
1469 2 : err := w.flush(key)
1470 2 :
1471 2 : if err != nil {
1472 1 : w.err = err
1473 1 : return err
1474 1 : }
1475 :
1476 2 : return nil
1477 : }
1478 :
1479 : // dataBlockBuf.dataBlockProps set by this method must be encoded before any future use of the
1480 : // dataBlockBuf.blockPropsEncoder, since the properties slice will get reused by the
1481 : // blockPropsEncoder.
1482 2 : func (w *Writer) finishDataBlockProps(buf *dataBlockBuf) error {
1483 2 : if len(w.blockPropCollectors) == 0 {
1484 2 : return nil
1485 2 : }
1486 2 : var err error
1487 2 : buf.blockPropsEncoder.resetProps()
1488 2 : for i := range w.blockPropCollectors {
1489 2 : scratch := buf.blockPropsEncoder.getScratchForProp()
1490 2 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
1491 1 : return err
1492 1 : }
1493 2 : buf.blockPropsEncoder.addProp(shortID(i), scratch)
1494 : }
1495 :
1496 2 : buf.dataBlockProps = buf.blockPropsEncoder.unsafeProps()
1497 2 : return nil
1498 : }
1499 :
1500 : // The BlockHandleWithProperties returned by this method must be encoded before any future use of
1501 : // the Writer.blockPropsEncoder, since the properties slice will get reused by the blockPropsEncoder.
1502 : // maybeAddBlockPropertiesToBlockHandle should only be called if block is being written synchronously
1503 : // with the Writer client.
1504 : func (w *Writer) maybeAddBlockPropertiesToBlockHandle(
1505 : bh BlockHandle,
1506 2 : ) (BlockHandleWithProperties, error) {
1507 2 : err := w.finishDataBlockProps(w.dataBlockBuf)
1508 2 : if err != nil {
1509 0 : return BlockHandleWithProperties{}, err
1510 0 : }
1511 2 : return BlockHandleWithProperties{BlockHandle: bh, Props: w.dataBlockBuf.dataBlockProps}, nil
1512 : }
1513 :
1514 2 : func (w *Writer) indexEntrySep(prevKey, key InternalKey, dataBlockBuf *dataBlockBuf) InternalKey {
1515 2 : // Make a rough guess that we want key-sized scratch to compute the separator.
1516 2 : if cap(dataBlockBuf.sepScratch) < key.Size() {
1517 2 : dataBlockBuf.sepScratch = make([]byte, 0, key.Size()*2)
1518 2 : }
1519 :
1520 2 : var sep InternalKey
1521 2 : if key.UserKey == nil && key.Trailer == 0 {
1522 2 : sep = prevKey.Successor(w.compare, w.successor, dataBlockBuf.sepScratch[:0])
1523 2 : } else {
1524 2 : sep = prevKey.Separator(w.compare, w.separator, dataBlockBuf.sepScratch[:0], key)
1525 2 : }
1526 2 : return sep
1527 : }
1528 :
1529 : // addIndexEntry adds an index entry for the specified key and block handle.
1530 : // addIndexEntry can be called from both the Writer client goroutine, and the
1531 : // writeQueue goroutine. If the flushIndexBuf != nil, then the indexProps, as
1532 : // they're used when the index block is finished.
1533 : //
1534 : // Invariant:
1535 : // 1. addIndexEntry must not store references to the sep InternalKey, the tmp
1536 : // byte slice, bhp.Props. That is, these must be either deep copied or
1537 : // encoded.
1538 : // 2. addIndexEntry must not hold references to the flushIndexBuf, and the writeTo
1539 : // indexBlockBufs.
1540 : func (w *Writer) addIndexEntry(
1541 : sep InternalKey,
1542 : bhp BlockHandleWithProperties,
1543 : tmp []byte,
1544 : flushIndexBuf *indexBlockBuf,
1545 : writeTo *indexBlockBuf,
1546 : inflightSize int,
1547 : indexProps []byte,
1548 2 : ) error {
1549 2 : if bhp.Length == 0 {
1550 0 : // A valid blockHandle must be non-zero.
1551 0 : // In particular, it must have a non-zero length.
1552 0 : return nil
1553 0 : }
1554 :
1555 2 : encoded := encodeBlockHandleWithProperties(tmp, bhp)
1556 2 :
1557 2 : if flushIndexBuf != nil {
1558 2 : if cap(w.indexPartitions) == 0 {
1559 2 : w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
1560 2 : }
1561 : // Enable two level indexes if there is more than one index block.
1562 2 : w.twoLevelIndex = true
1563 2 : if err := w.finishIndexBlock(flushIndexBuf, indexProps); err != nil {
1564 0 : return err
1565 0 : }
1566 : }
1567 :
1568 2 : writeTo.add(sep, encoded, inflightSize)
1569 2 : return nil
1570 : }
1571 :
1572 2 : func (w *Writer) addPrevDataBlockToIndexBlockProps() {
1573 2 : for i := range w.blockPropCollectors {
1574 2 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
1575 2 : }
1576 : }
1577 :
1578 : // addIndexEntrySync adds an index entry for the specified key and block handle.
1579 : // Writer.addIndexEntry is only called synchronously once Writer.Close is called.
1580 : // addIndexEntrySync should only be called if we're sure that index entries
1581 : // aren't being written asynchronously.
1582 : //
1583 : // Invariant:
1584 : // 1. addIndexEntrySync must not store references to the prevKey, key InternalKey's,
1585 : // the tmp byte slice. That is, these must be either deep copied or encoded.
1586 : //
1587 : // TODO: Improve coverage of this method. e.g. tests passed without the line
1588 : // `w.twoLevelIndex = true` previously.
1589 : func (w *Writer) addIndexEntrySync(
1590 : prevKey, key InternalKey, bhp BlockHandleWithProperties, tmp []byte,
1591 2 : ) error {
1592 2 : return w.addIndexEntrySep(w.indexEntrySep(prevKey, key, w.dataBlockBuf), bhp, tmp)
1593 2 : }
1594 :
1595 : func (w *Writer) addIndexEntrySep(
1596 : sep InternalKey, bhp BlockHandleWithProperties, tmp []byte,
1597 2 : ) error {
1598 2 : shouldFlush := supportsTwoLevelIndex(
1599 2 : w.tableFormat) && w.indexBlock.shouldFlush(
1600 2 : sep, encodedBHPEstimatedSize, w.indexBlockOptions, w.allocatorSizeClasses,
1601 2 : )
1602 2 : var flushableIndexBlock *indexBlockBuf
1603 2 : var props []byte
1604 2 : var err error
1605 2 : if shouldFlush {
1606 2 : flushableIndexBlock = w.indexBlock
1607 2 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1608 2 : w.twoLevelIndex = true
1609 2 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1610 2 : // flush the index block.
1611 2 : props, err = w.finishIndexBlockProps()
1612 2 : if err != nil {
1613 0 : return err
1614 0 : }
1615 : }
1616 :
1617 2 : err = w.addIndexEntry(sep, bhp, tmp, flushableIndexBlock, w.indexBlock, 0, props)
1618 2 : if flushableIndexBlock != nil {
1619 2 : flushableIndexBlock.clear()
1620 2 : indexBlockBufPool.Put(flushableIndexBlock)
1621 2 : }
1622 2 : w.addPrevDataBlockToIndexBlockProps()
1623 2 : return err
1624 : }
1625 :
1626 : func shouldFlushWithHints(
1627 : keyLen, valueLen int,
1628 : restartInterval, estimatedBlockSize, numEntries int,
1629 : flushOptions flushDecisionOptions,
1630 : sizeClassHints []int,
1631 2 : ) bool {
1632 2 : if numEntries == 0 {
1633 2 : return false
1634 2 : }
1635 :
1636 : // If we are not informed about the memory allocator's size classes we fall
1637 : // back to a simple set of flush heuristics that are unaware of internal
1638 : // fragmentation in block cache allocations.
1639 2 : if len(sizeClassHints) == 0 {
1640 2 : return shouldFlushWithoutHints(
1641 2 : keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, flushOptions)
1642 2 : }
1643 :
1644 : // For size-class aware flushing we need to account for the metadata that is
1645 : // allocated when this block is loaded into the block cache. For instance, if
1646 : // a block has size 1020B it may fit within a 1024B class. However, when
1647 : // loaded into the block cache we also allocate space for the cache entry
1648 : // metadata. The new allocation of size ~1052B may now only fit within a
1649 : // 2048B class, which increases internal fragmentation.
1650 1 : blockSizeWithMetadata := estimatedBlockSize + cache.ValueMetadataSize
1651 1 :
1652 1 : // For the fast path we can avoid computing the exact varint encoded
1653 1 : // key-value pair size. Instead, we combine the key-value pair size with an
1654 1 : // upper-bound estimate of the associated metadata (4B restart point, 4B
1655 1 : // shared prefix length, 5B varint unshared key size, 5B varint value size).
1656 1 : newEstimatedSize := blockSizeWithMetadata + keyLen + valueLen + 18
1657 1 : // Our new block size estimate disregards key prefix compression. This puts
1658 1 : // us at risk of overestimating the size and flushing small blocks. We
1659 1 : // mitigate this by imposing a minimum size restriction.
1660 1 : if blockSizeWithMetadata <= flushOptions.sizeClassAwareThreshold || newEstimatedSize <= flushOptions.blockSize {
1661 1 : return false
1662 1 : }
1663 :
1664 1 : sizeClass, ok := blockSizeClass(blockSizeWithMetadata, sizeClassHints)
1665 1 : // If the block size could not be mapped to a size class we fall back to
1666 1 : // using a simpler set of flush heuristics.
1667 1 : if !ok {
1668 1 : return shouldFlushWithoutHints(
1669 1 : keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, flushOptions)
1670 1 : }
1671 :
1672 : // Tighter upper-bound estimate of the metadata stored with the next
1673 : // key-value pair.
1674 1 : newSize := blockSizeWithMetadata + keyLen + valueLen
1675 1 : if numEntries%restartInterval == 0 {
1676 0 : newSize += 4
1677 0 : }
1678 1 : newSize += 4 // varint for shared prefix length
1679 1 : newSize += uvarintLen(uint32(keyLen)) // varint for unshared key bytes
1680 1 : newSize += uvarintLen(uint32(valueLen)) // varint for value size
1681 1 :
1682 1 : if blockSizeWithMetadata < flushOptions.blockSize {
1683 1 : newSizeClass, ok := blockSizeClass(newSize, sizeClassHints)
1684 1 : if ok && newSizeClass-newSize >= sizeClass-blockSizeWithMetadata {
1685 1 : // Although the block hasn't reached the target size, waiting to insert the
1686 1 : // next entry would exceed the target and increase memory fragmentation.
1687 1 : return true
1688 1 : }
1689 1 : return false
1690 : }
1691 :
1692 : // Flush if inserting the next entry bumps the block size to the memory
1693 : // allocator's next size class.
1694 1 : return newSize > sizeClass
1695 : }
1696 :
1697 : func shouldFlushWithoutHints(
1698 : keyLen, valueLen int,
1699 : restartInterval, estimatedBlockSize, numEntries int,
1700 : flushOptions flushDecisionOptions,
1701 2 : ) bool {
1702 2 : if estimatedBlockSize >= flushOptions.blockSize {
1703 2 : return true
1704 2 : }
1705 :
1706 : // The block is currently smaller than the target size.
1707 2 : if estimatedBlockSize <= flushOptions.blockSizeThreshold {
1708 2 : // The block is smaller than the threshold size at which we'll consider
1709 2 : // flushing it.
1710 2 : return false
1711 2 : }
1712 :
1713 2 : newSize := estimatedBlockSize + keyLen + valueLen
1714 2 : if numEntries%restartInterval == 0 {
1715 2 : newSize += 4
1716 2 : }
1717 2 : newSize += 4 // varint for shared prefix length
1718 2 : newSize += uvarintLen(uint32(keyLen)) // varint for unshared key bytes
1719 2 : newSize += uvarintLen(uint32(valueLen)) // varint for value size
1720 2 : // Flush if the block plus the new entry is larger than the target size.
1721 2 : return newSize > flushOptions.blockSize
1722 : }
1723 :
1724 : // blockSizeClass returns the smallest memory allocator size class that could
1725 : // hold a block of a given size and returns a boolean indicating whether an
1726 : // appropriate size class was found. It is useful for computing the potential
1727 : // space wasted by an allocation.
1728 1 : func blockSizeClass(blockSize int, sizeClassHints []int) (int, bool) {
1729 1 : sizeClassIdx, _ := slices.BinarySearch(sizeClassHints, blockSize)
1730 1 : if sizeClassIdx == len(sizeClassHints) {
1731 1 : return -1, false
1732 1 : }
1733 1 : return sizeClassHints[sizeClassIdx], true
1734 : }
1735 :
1736 2 : func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) {
1737 2 : if len(k.UserKey) == 0 {
1738 0 : return a, k
1739 0 : }
1740 2 : a, keyCopy := a.Copy(k.UserKey)
1741 2 : return a, InternalKey{UserKey: keyCopy, Trailer: k.Trailer}
1742 : }
1743 :
1744 : // Invariants: The byte slice returned by finishIndexBlockProps is heap-allocated
1745 : //
1746 : // and has its own lifetime, independent of the Writer and the blockPropsEncoder,
1747 : //
1748 : // and it is safe to:
1749 : // 1. Reuse w.blockPropsEncoder without first encoding the byte slice returned.
1750 : // 2. Store the byte slice in the Writer since it is a copy and not supported by
1751 : // an underlying buffer.
1752 2 : func (w *Writer) finishIndexBlockProps() ([]byte, error) {
1753 2 : w.blockPropsEncoder.resetProps()
1754 2 : for i := range w.blockPropCollectors {
1755 2 : scratch := w.blockPropsEncoder.getScratchForProp()
1756 2 : var err error
1757 2 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
1758 1 : return nil, err
1759 1 : }
1760 2 : w.blockPropsEncoder.addProp(shortID(i), scratch)
1761 : }
1762 2 : return w.blockPropsEncoder.props(), nil
1763 : }
1764 :
1765 : // finishIndexBlock finishes the current index block and adds it to the top
1766 : // level index block. This is only used when two level indexes are enabled.
1767 : //
1768 : // Invariants:
1769 : // 1. The props slice passed into finishedIndexBlock must not be a
1770 : // owned by any other struct, since it will be stored in the Writer.indexPartitions
1771 : // slice.
1772 : // 2. None of the buffers owned by indexBuf will be shallow copied and stored elsewhere.
1773 : // That is, it must be safe to reuse indexBuf after finishIndexBlock has been called.
1774 2 : func (w *Writer) finishIndexBlock(indexBuf *indexBlockBuf, props []byte) error {
1775 2 : part := indexBlockAndBlockProperties{
1776 2 : nEntries: indexBuf.block.nEntries, properties: props,
1777 2 : }
1778 2 : w.indexSepAlloc, part.sep = cloneKeyWithBuf(
1779 2 : indexBuf.block.getCurKey(), w.indexSepAlloc,
1780 2 : )
1781 2 : bk := indexBuf.finish()
1782 2 : if len(w.indexBlockAlloc) < len(bk) {
1783 2 : // Allocate enough bytes for approximately 16 index blocks.
1784 2 : w.indexBlockAlloc = make([]byte, len(bk)*16)
1785 2 : }
1786 2 : n := copy(w.indexBlockAlloc, bk)
1787 2 : part.block = w.indexBlockAlloc[:n:n]
1788 2 : w.indexBlockAlloc = w.indexBlockAlloc[n:]
1789 2 : w.indexPartitions = append(w.indexPartitions, part)
1790 2 : return nil
1791 : }
1792 :
1793 2 : func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {
1794 2 : props, err := w.finishIndexBlockProps()
1795 2 : if err != nil {
1796 0 : return BlockHandle{}, err
1797 0 : }
1798 : // Add the final unfinished index.
1799 2 : if err = w.finishIndexBlock(w.indexBlock, props); err != nil {
1800 0 : return BlockHandle{}, err
1801 0 : }
1802 :
1803 2 : for i := range w.indexPartitions {
1804 2 : b := &w.indexPartitions[i]
1805 2 : w.props.NumDataBlocks += uint64(b.nEntries)
1806 2 :
1807 2 : data := b.block
1808 2 : w.props.IndexSize += uint64(len(data))
1809 2 : bh, err := w.writeBlock(data, w.compression, &w.blockBuf)
1810 2 : if err != nil {
1811 0 : return BlockHandle{}, err
1812 0 : }
1813 2 : bhp := BlockHandleWithProperties{
1814 2 : BlockHandle: bh,
1815 2 : Props: b.properties,
1816 2 : }
1817 2 : encoded := encodeBlockHandleWithProperties(w.blockBuf.tmp[:], bhp)
1818 2 : w.topLevelIndexBlock.add(b.sep, encoded)
1819 : }
1820 :
1821 : // NB: RocksDB includes the block trailer length in the index size
1822 : // property, though it doesn't include the trailer in the top level
1823 : // index size property.
1824 2 : w.props.IndexPartitions = uint64(len(w.indexPartitions))
1825 2 : w.props.TopLevelIndexSize = uint64(w.topLevelIndexBlock.estimatedSize())
1826 2 : w.props.IndexSize += w.props.TopLevelIndexSize + blockTrailerLen
1827 2 :
1828 2 : return w.writeBlock(w.topLevelIndexBlock.finish(), w.compression, &w.blockBuf)
1829 : }
1830 :
1831 2 : func compressAndChecksum(b []byte, compression Compression, blockBuf *blockBuf) []byte {
1832 2 : // Compress the buffer, discarding the result if the improvement isn't at
1833 2 : // least 12.5%.
1834 2 : blockType, compressed := compressBlock(compression, b, blockBuf.compressedBuf)
1835 2 : if blockType != noCompressionBlockType && cap(compressed) > cap(blockBuf.compressedBuf) {
1836 2 : blockBuf.compressedBuf = compressed[:cap(compressed)]
1837 2 : }
1838 2 : if len(compressed) < len(b)-len(b)/8 {
1839 2 : b = compressed
1840 2 : } else {
1841 2 : blockType = noCompressionBlockType
1842 2 : }
1843 :
1844 2 : blockBuf.tmp[0] = byte(blockType)
1845 2 :
1846 2 : // Calculate the checksum.
1847 2 : checksum := blockBuf.checksummer.checksum(b, blockBuf.tmp[:1])
1848 2 : binary.LittleEndian.PutUint32(blockBuf.tmp[1:5], checksum)
1849 2 : return b
1850 : }
1851 :
1852 2 : func (w *Writer) writeCompressedBlock(block []byte, blockTrailerBuf []byte) (BlockHandle, error) {
1853 2 : bh := BlockHandle{Offset: w.meta.Size, Length: uint64(len(block))}
1854 2 :
1855 2 : if w.cacheID != 0 && w.fileNum != 0 {
1856 2 : // Remove the block being written from the cache. This provides defense in
1857 2 : // depth against bugs which cause cache collisions.
1858 2 : //
1859 2 : // TODO(peter): Alternatively, we could add the uncompressed value to the
1860 2 : // cache.
1861 2 : w.cache.Delete(w.cacheID, w.fileNum, bh.Offset)
1862 2 : }
1863 :
1864 : // Write the bytes to the file.
1865 2 : if err := w.writable.Write(block); err != nil {
1866 0 : return BlockHandle{}, err
1867 0 : }
1868 2 : w.meta.Size += uint64(len(block))
1869 2 : if err := w.writable.Write(blockTrailerBuf[:blockTrailerLen]); err != nil {
1870 0 : return BlockHandle{}, err
1871 0 : }
1872 2 : w.meta.Size += blockTrailerLen
1873 2 :
1874 2 : return bh, nil
1875 : }
1876 :
1877 : // Write implements io.Writer. This is analogous to writeCompressedBlock for
1878 : // blocks that already incorporate the trailer, and don't need the callee to
1879 : // return a BlockHandle.
1880 2 : func (w *Writer) Write(blockWithTrailer []byte) (n int, err error) {
1881 2 : offset := w.meta.Size
1882 2 : if w.cacheID != 0 && w.fileNum != 0 {
1883 2 : // Remove the block being written from the cache. This provides defense in
1884 2 : // depth against bugs which cause cache collisions.
1885 2 : //
1886 2 : // TODO(peter): Alternatively, we could add the uncompressed value to the
1887 2 : // cache.
1888 2 : w.cache.Delete(w.cacheID, w.fileNum, offset)
1889 2 : }
1890 2 : w.meta.Size += uint64(len(blockWithTrailer))
1891 2 : if err := w.writable.Write(blockWithTrailer); err != nil {
1892 0 : return 0, err
1893 0 : }
1894 2 : return len(blockWithTrailer), nil
1895 : }
1896 :
1897 : func (w *Writer) writeBlock(
1898 : b []byte, compression Compression, blockBuf *blockBuf,
1899 2 : ) (BlockHandle, error) {
1900 2 : b = compressAndChecksum(b, compression, blockBuf)
1901 2 : return w.writeCompressedBlock(b, blockBuf.tmp[:])
1902 2 : }
1903 :
1904 : // assertFormatCompatibility ensures that the features present on the table are
1905 : // compatible with the table format version.
1906 2 : func (w *Writer) assertFormatCompatibility() error {
1907 2 : // PebbleDBv1: block properties.
1908 2 : if len(w.blockPropCollectors) > 0 && w.tableFormat < TableFormatPebblev1 {
1909 1 : return errors.Newf(
1910 1 : "table format version %s is less than the minimum required version %s for block properties",
1911 1 : w.tableFormat, TableFormatPebblev1,
1912 1 : )
1913 1 : }
1914 :
1915 : // PebbleDBv2: range keys.
1916 2 : if w.props.NumRangeKeys() > 0 && w.tableFormat < TableFormatPebblev2 {
1917 1 : return errors.Newf(
1918 1 : "table format version %s is less than the minimum required version %s for range keys",
1919 1 : w.tableFormat, TableFormatPebblev2,
1920 1 : )
1921 1 : }
1922 :
1923 : // PebbleDBv3: value blocks.
1924 2 : if (w.props.NumValueBlocks > 0 || w.props.NumValuesInValueBlocks > 0 ||
1925 2 : w.props.ValueBlocksSize > 0) && w.tableFormat < TableFormatPebblev3 {
1926 0 : return errors.Newf(
1927 0 : "table format version %s is less than the minimum required version %s for value blocks",
1928 0 : w.tableFormat, TableFormatPebblev3)
1929 0 : }
1930 :
1931 : // PebbleDBv4: DELSIZED tombstones.
1932 2 : if w.props.NumSizedDeletions > 0 && w.tableFormat < TableFormatPebblev4 {
1933 0 : return errors.Newf(
1934 0 : "table format version %s is less than the minimum required version %s for sized deletion tombstones",
1935 0 : w.tableFormat, TableFormatPebblev4)
1936 0 : }
1937 2 : return nil
1938 : }
1939 :
1940 : // Close finishes writing the table and closes the underlying file that the
1941 : // table was written to.
1942 2 : func (w *Writer) Close() (err error) {
1943 2 : defer func() {
1944 2 : if w.valueBlockWriter != nil {
1945 2 : releaseValueBlockWriter(w.valueBlockWriter)
1946 2 : // Defensive code in case Close gets called again. We don't want to put
1947 2 : // the same object to a sync.Pool.
1948 2 : w.valueBlockWriter = nil
1949 2 : }
1950 2 : if w.writable != nil {
1951 1 : w.writable.Abort()
1952 1 : w.writable = nil
1953 1 : }
1954 : // Record any error in the writer (so we can exit early if Close is called
1955 : // again).
1956 2 : if err != nil {
1957 1 : w.err = err
1958 1 : }
1959 : }()
1960 :
1961 : // finish must be called before we check for an error, because finish will
1962 : // block until every single task added to the writeQueue has been processed,
1963 : // and an error could be encountered while any of those tasks are processed.
1964 2 : if err := w.coordination.writeQueue.finish(); err != nil {
1965 1 : return err
1966 1 : }
1967 :
1968 2 : if w.err != nil {
1969 1 : return w.err
1970 1 : }
1971 :
1972 : // The w.meta.LargestPointKey is only used once the Writer is closed, so it is safe to set it
1973 : // when the Writer is closed.
1974 : //
1975 : // The following invariants ensure that setting the largest key at this point of a Writer close
1976 : // is correct:
1977 : // 1. Keys must only be added to the Writer in an increasing order.
1978 : // 2. The current w.dataBlockBuf is guaranteed to have the latest key added to the Writer. This
1979 : // must be true, because a w.dataBlockBuf is only switched out when a dataBlock is flushed,
1980 : // however, if a dataBlock is flushed, then we add a key to the new w.dataBlockBuf in the
1981 : // addPoint function after the flush occurs.
1982 2 : if w.dataBlockBuf.dataBlock.nEntries >= 1 {
1983 2 : w.meta.SetLargestPointKey(w.dataBlockBuf.dataBlock.getCurKey().Clone())
1984 2 : }
1985 :
1986 : // Finish the last data block, or force an empty data block if there
1987 : // aren't any data blocks at all.
1988 2 : if w.dataBlockBuf.dataBlock.nEntries > 0 || w.indexBlock.block.nEntries == 0 {
1989 2 : bh, err := w.writeBlock(w.dataBlockBuf.dataBlock.finish(), w.compression, &w.dataBlockBuf.blockBuf)
1990 2 : if err != nil {
1991 0 : return err
1992 0 : }
1993 2 : bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
1994 2 : if err != nil {
1995 0 : return err
1996 0 : }
1997 2 : prevKey := w.dataBlockBuf.dataBlock.getCurKey()
1998 2 : if err := w.addIndexEntrySync(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil {
1999 0 : return err
2000 0 : }
2001 : }
2002 2 : w.props.DataSize = w.meta.Size
2003 2 :
2004 2 : // Write the filter block.
2005 2 : var metaindex rawBlockWriter
2006 2 : metaindex.restartInterval = 1
2007 2 : if w.filter != nil {
2008 2 : b, err := w.filter.finish()
2009 2 : if err != nil {
2010 0 : return err
2011 0 : }
2012 2 : bh, err := w.writeBlock(b, NoCompression, &w.blockBuf)
2013 2 : if err != nil {
2014 0 : return err
2015 0 : }
2016 2 : n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
2017 2 : metaindex.add(InternalKey{UserKey: []byte(w.filter.metaName())}, w.blockBuf.tmp[:n])
2018 2 : w.props.FilterPolicyName = w.filter.policyName()
2019 2 : w.props.FilterSize = bh.Length
2020 : }
2021 :
2022 2 : var indexBH BlockHandle
2023 2 : if w.twoLevelIndex {
2024 2 : w.props.IndexType = twoLevelIndex
2025 2 : // Write the two level index block.
2026 2 : indexBH, err = w.writeTwoLevelIndex()
2027 2 : if err != nil {
2028 0 : return err
2029 0 : }
2030 2 : } else {
2031 2 : w.props.IndexType = binarySearchIndex
2032 2 : // NB: RocksDB includes the block trailer length in the index size
2033 2 : // property, though it doesn't include the trailer in the filter size
2034 2 : // property.
2035 2 : w.props.IndexSize = uint64(w.indexBlock.estimatedSize()) + blockTrailerLen
2036 2 : w.props.NumDataBlocks = uint64(w.indexBlock.block.nEntries)
2037 2 :
2038 2 : // Write the single level index block.
2039 2 : indexBH, err = w.writeBlock(w.indexBlock.finish(), w.compression, &w.blockBuf)
2040 2 : if err != nil {
2041 0 : return err
2042 0 : }
2043 : }
2044 :
2045 : // Write the range-del block. The block handle must added to the meta index block
2046 : // after the properties block has been written. This is because the entries in the
2047 : // metaindex block must be sorted by key.
2048 2 : var rangeDelBH BlockHandle
2049 2 : if w.props.NumRangeDeletions > 0 {
2050 2 : if !w.rangeDelV1Format {
2051 2 : // Because the range tombstones are fragmented in the v2 format, the end
2052 2 : // key of the last added range tombstone will be the largest range
2053 2 : // tombstone key. Note that we need to make this into a range deletion
2054 2 : // sentinel because sstable boundaries are inclusive while the end key of
2055 2 : // a range deletion tombstone is exclusive. A Clone() is necessary as
2056 2 : // rangeDelBlock.curValue is the same slice that will get passed
2057 2 : // into w.writer, and some implementations of vfs.File mutate the
2058 2 : // slice passed into Write(). Also, w.meta will often outlive the
2059 2 : // blockWriter, and so cloning curValue allows the rangeDelBlock's
2060 2 : // internal buffer to get gc'd.
2061 2 : k := base.MakeRangeDeleteSentinelKey(w.rangeDelBlock.curValue).Clone()
2062 2 : w.meta.SetLargestRangeDelKey(k)
2063 2 : }
2064 2 : rangeDelBH, err = w.writeBlock(w.rangeDelBlock.finish(), NoCompression, &w.blockBuf)
2065 2 : if err != nil {
2066 0 : return err
2067 0 : }
2068 : }
2069 :
2070 : // Write the range-key block, flushing any remaining spans from the
2071 : // fragmenter first.
2072 2 : w.fragmenter.Finish()
2073 2 :
2074 2 : var rangeKeyBH BlockHandle
2075 2 : if w.props.NumRangeKeys() > 0 {
2076 2 : key := w.rangeKeyBlock.getCurKey()
2077 2 : kind := key.Kind()
2078 2 : endKey, _, ok := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.curValue)
2079 2 : if !ok {
2080 0 : return errors.Newf("invalid end key: %s", w.rangeKeyBlock.curValue)
2081 0 : }
2082 2 : k := base.MakeExclusiveSentinelKey(kind, endKey).Clone()
2083 2 : w.meta.SetLargestRangeKey(k)
2084 2 : // TODO(travers): The lack of compression on the range key block matches the
2085 2 : // lack of compression on the range-del block. Revisit whether we want to
2086 2 : // enable compression on this block.
2087 2 : rangeKeyBH, err = w.writeBlock(w.rangeKeyBlock.finish(), NoCompression, &w.blockBuf)
2088 2 : if err != nil {
2089 0 : return err
2090 0 : }
2091 : }
2092 :
2093 2 : if w.valueBlockWriter != nil {
2094 2 : vbiHandle, vbStats, err := w.valueBlockWriter.finish(w, w.meta.Size)
2095 2 : if err != nil {
2096 0 : return err
2097 0 : }
2098 2 : w.props.NumValueBlocks = vbStats.numValueBlocks
2099 2 : w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
2100 2 : w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
2101 2 : if vbStats.numValueBlocks > 0 {
2102 2 : n := encodeValueBlocksIndexHandle(w.blockBuf.tmp[:], vbiHandle)
2103 2 : metaindex.add(InternalKey{UserKey: []byte(metaValueIndexName)}, w.blockBuf.tmp[:n])
2104 2 : }
2105 : }
2106 :
2107 : // Add the range key block handle to the metaindex block. Note that we add the
2108 : // block handle to the metaindex block before the other meta blocks as the
2109 : // metaindex block entries must be sorted, and the range key block name sorts
2110 : // before the other block names.
2111 2 : if w.props.NumRangeKeys() > 0 {
2112 2 : n := encodeBlockHandle(w.blockBuf.tmp[:], rangeKeyBH)
2113 2 : metaindex.add(InternalKey{UserKey: []byte(metaRangeKeyName)}, w.blockBuf.tmp[:n])
2114 2 : }
2115 :
2116 2 : {
2117 2 : // Finish and record the prop collectors if props are not yet recorded.
2118 2 : // Pre-computed props might have been copied by specialized sst creators
2119 2 : // like suffix replacer.
2120 2 : if len(w.props.UserProperties) == 0 {
2121 2 : userProps := make(map[string]string)
2122 2 : for i := range w.blockPropCollectors {
2123 2 : scratch := w.blockPropsEncoder.getScratchForProp()
2124 2 : // Place the shortID in the first byte.
2125 2 : scratch = append(scratch, byte(i))
2126 2 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
2127 2 : if err != nil {
2128 1 : return err
2129 1 : }
2130 2 : var prop string
2131 2 : if len(buf) > 0 {
2132 2 : prop = string(buf)
2133 2 : }
2134 : // NB: The property is populated in the map even if it is the
2135 : // empty string, since the presence in the map is what indicates
2136 : // that the block property collector was used when writing.
2137 2 : userProps[w.blockPropCollectors[i].Name()] = prop
2138 : }
2139 2 : if len(userProps) > 0 {
2140 2 : w.props.UserProperties = userProps
2141 2 : }
2142 : }
2143 :
2144 : // Write the properties block.
2145 2 : var raw rawBlockWriter
2146 2 : // The restart interval is set to infinity because the properties block
2147 2 : // is always read sequentially and cached in a heap located object. This
2148 2 : // reduces table size without a significant impact on performance.
2149 2 : raw.restartInterval = propertiesBlockRestartInterval
2150 2 : w.props.CompressionOptions = rocksDBCompressionOptions
2151 2 : w.props.save(w.tableFormat, &raw)
2152 2 : bh, err := w.writeBlock(raw.finish(), NoCompression, &w.blockBuf)
2153 2 : if err != nil {
2154 0 : return err
2155 0 : }
2156 2 : n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
2157 2 : metaindex.add(InternalKey{UserKey: []byte(metaPropertiesName)}, w.blockBuf.tmp[:n])
2158 : }
2159 :
2160 : // Add the range deletion block handle to the metaindex block.
2161 2 : if w.props.NumRangeDeletions > 0 {
2162 2 : n := encodeBlockHandle(w.blockBuf.tmp[:], rangeDelBH)
2163 2 : // The v2 range-del block encoding is backwards compatible with the v1
2164 2 : // encoding. We add meta-index entries for both the old name and the new
2165 2 : // name so that old code can continue to find the range-del block and new
2166 2 : // code knows that the range tombstones in the block are fragmented and
2167 2 : // sorted.
2168 2 : metaindex.add(InternalKey{UserKey: []byte(metaRangeDelName)}, w.blockBuf.tmp[:n])
2169 2 : if !w.rangeDelV1Format {
2170 2 : metaindex.add(InternalKey{UserKey: []byte(metaRangeDelV2Name)}, w.blockBuf.tmp[:n])
2171 2 : }
2172 : }
2173 :
2174 : // Write the metaindex block. It might be an empty block, if the filter
2175 : // policy is nil. NoCompression is specified because a) RocksDB never
2176 : // compresses the meta-index block and b) RocksDB has some code paths which
2177 : // expect the meta-index block to not be compressed.
2178 2 : metaindexBH, err := w.writeBlock(metaindex.blockWriter.finish(), NoCompression, &w.blockBuf)
2179 2 : if err != nil {
2180 0 : return err
2181 0 : }
2182 :
2183 : // Write the table footer.
2184 2 : footer := footer{
2185 2 : format: w.tableFormat,
2186 2 : checksum: w.blockBuf.checksummer.checksumType,
2187 2 : metaindexBH: metaindexBH,
2188 2 : indexBH: indexBH,
2189 2 : }
2190 2 : encoded := footer.encode(w.blockBuf.tmp[:])
2191 2 : if err := w.writable.Write(footer.encode(w.blockBuf.tmp[:])); err != nil {
2192 0 : return err
2193 0 : }
2194 2 : w.meta.Size += uint64(len(encoded))
2195 2 : w.meta.Properties = w.props
2196 2 :
2197 2 : // Check that the features present in the table are compatible with the format
2198 2 : // configured for the table.
2199 2 : if err = w.assertFormatCompatibility(); err != nil {
2200 1 : return err
2201 1 : }
2202 :
2203 2 : if err := w.writable.Finish(); err != nil {
2204 1 : w.writable = nil
2205 1 : return err
2206 1 : }
2207 2 : w.writable = nil
2208 2 :
2209 2 : w.dataBlockBuf.clear()
2210 2 : dataBlockBufPool.Put(w.dataBlockBuf)
2211 2 : w.dataBlockBuf = nil
2212 2 : w.indexBlock.clear()
2213 2 : indexBlockBufPool.Put(w.indexBlock)
2214 2 : w.indexBlock = nil
2215 2 :
2216 2 : // Make any future calls to Set or Close return an error.
2217 2 : w.err = errWriterClosed
2218 2 : return nil
2219 : }
2220 :
2221 : // EstimatedSize returns the estimated size of the sstable being written if a
2222 : // call to Finish() was made without adding additional keys.
2223 2 : func (w *Writer) EstimatedSize() uint64 {
2224 2 : if w == nil {
2225 2 : return 0
2226 2 : }
2227 2 : return w.coordination.sizeEstimate.size() +
2228 2 : uint64(w.dataBlockBuf.dataBlock.estimatedSize()) +
2229 2 : w.indexBlock.estimatedSize()
2230 : }
2231 :
2232 : // Metadata returns the metadata for the finished sstable. Only valid to call
2233 : // after the sstable has been finished.
2234 2 : func (w *Writer) Metadata() (*WriterMetadata, error) {
2235 2 : if w.writable != nil {
2236 0 : return nil, errors.New("pebble: writer is not closed")
2237 0 : }
2238 2 : return &w.meta, nil
2239 : }
2240 :
2241 : // WriterOption provide an interface to do work on Writer while it is being
2242 : // opened.
2243 : type WriterOption interface {
2244 : // writerApply is called on the writer during opening in order to set
2245 : // internal parameters.
2246 : writerApply(*Writer)
2247 : }
2248 :
2249 : // UnsafeLastPointUserKey returns the last point key written to the writer to
2250 : // which this option was passed during creation. The returned key points
2251 : // directly into a buffer belonging to the Writer. The value's lifetime ends the
2252 : // next time a point key is added to the Writer.
2253 : //
2254 : // Must not be called after Writer is closed.
2255 2 : func (w *Writer) UnsafeLastPointUserKey() []byte {
2256 2 : if w != nil && w.dataBlockBuf.dataBlock.nEntries >= 1 {
2257 2 : // w.dataBlockBuf.dataBlock.curKey is guaranteed to point to the last point key
2258 2 : // which was added to the Writer.
2259 2 : return w.dataBlockBuf.dataBlock.getCurUserKey()
2260 2 : }
2261 0 : return nil
2262 : }
2263 :
2264 : // NewWriter returns a new table writer for the file. Closing the writer will
2265 : // close the file.
2266 2 : func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...WriterOption) *Writer {
2267 2 : o = o.ensureDefaults()
2268 2 : w := &Writer{
2269 2 : writable: writable,
2270 2 : meta: WriterMetadata{
2271 2 : SmallestSeqNum: math.MaxUint64,
2272 2 : },
2273 2 : dataBlockOptions: flushDecisionOptions{
2274 2 : blockSize: o.BlockSize,
2275 2 : blockSizeThreshold: (o.BlockSize*o.BlockSizeThreshold + 99) / 100,
2276 2 : sizeClassAwareThreshold: (o.BlockSize*o.SizeClassAwareThreshold + 99) / 100,
2277 2 : },
2278 2 : indexBlockOptions: flushDecisionOptions{
2279 2 : blockSize: o.IndexBlockSize,
2280 2 : blockSizeThreshold: (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100,
2281 2 : sizeClassAwareThreshold: (o.IndexBlockSize*o.SizeClassAwareThreshold + 99) / 100,
2282 2 : },
2283 2 : compare: o.Comparer.Compare,
2284 2 : split: o.Comparer.Split,
2285 2 : formatKey: o.Comparer.FormatKey,
2286 2 : compression: o.Compression,
2287 2 : separator: o.Comparer.Separator,
2288 2 : successor: o.Comparer.Successor,
2289 2 : tableFormat: o.TableFormat,
2290 2 : isStrictObsolete: o.IsStrictObsolete,
2291 2 : writingToLowestLevel: o.WritingToLowestLevel,
2292 2 : cache: o.Cache,
2293 2 : restartInterval: o.BlockRestartInterval,
2294 2 : checksumType: o.Checksum,
2295 2 : indexBlock: newIndexBlockBuf(o.Parallelism),
2296 2 : rangeDelBlock: blockWriter{
2297 2 : restartInterval: 1,
2298 2 : },
2299 2 : rangeKeyBlock: blockWriter{
2300 2 : restartInterval: 1,
2301 2 : },
2302 2 : topLevelIndexBlock: blockWriter{
2303 2 : restartInterval: 1,
2304 2 : },
2305 2 : fragmenter: keyspan.Fragmenter{
2306 2 : Cmp: o.Comparer.Compare,
2307 2 : Format: o.Comparer.FormatKey,
2308 2 : },
2309 2 : allocatorSizeClasses: o.AllocatorSizeClasses,
2310 2 : }
2311 2 : if w.tableFormat >= TableFormatPebblev3 {
2312 2 : w.shortAttributeExtractor = o.ShortAttributeExtractor
2313 2 : w.requiredInPlaceValueBound = o.RequiredInPlaceValueBound
2314 2 : if !o.DisableValueBlocks {
2315 2 : w.valueBlockWriter = newValueBlockWriter(
2316 2 : w.dataBlockOptions.blockSize, w.dataBlockOptions.blockSizeThreshold, w.compression, w.checksumType, func(compressedSize int) {
2317 2 : w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0)
2318 2 : })
2319 : }
2320 : }
2321 :
2322 2 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
2323 2 :
2324 2 : w.blockBuf = blockBuf{
2325 2 : checksummer: checksummer{checksumType: o.Checksum},
2326 2 : }
2327 2 :
2328 2 : w.coordination.init(o.Parallelism, w)
2329 2 :
2330 2 : if writable == nil {
2331 0 : w.err = errors.New("pebble: nil writable")
2332 0 : return w
2333 0 : }
2334 :
2335 : // Note that WriterOptions are applied in two places; the ones with a
2336 : // preApply() method are applied here. The rest are applied down below after
2337 : // default properties are set.
2338 2 : type preApply interface{ preApply() }
2339 2 : for _, opt := range extraOpts {
2340 2 : if _, ok := opt.(preApply); ok {
2341 2 : opt.writerApply(w)
2342 2 : }
2343 : }
2344 :
2345 2 : if o.FilterPolicy != nil {
2346 2 : switch o.FilterType {
2347 2 : case TableFilter:
2348 2 : w.filter = newTableFilterWriter(o.FilterPolicy)
2349 0 : default:
2350 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
2351 : }
2352 : }
2353 :
2354 2 : w.props.ComparerName = o.Comparer.Name
2355 2 : w.props.CompressionName = o.Compression.String()
2356 2 : w.props.MergerName = o.MergerName
2357 2 : w.props.PropertyCollectorNames = "[]"
2358 2 :
2359 2 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
2360 2 : if w.tableFormat >= TableFormatPebblev4 {
2361 2 : numBlockPropertyCollectors++
2362 2 : }
2363 :
2364 2 : if numBlockPropertyCollectors > 0 {
2365 2 : if numBlockPropertyCollectors > maxPropertyCollectors {
2366 0 : w.err = errors.New("pebble: too many block property collectors")
2367 0 : return w
2368 0 : }
2369 2 : w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
2370 2 : for _, constructFn := range o.BlockPropertyCollectors {
2371 2 : w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
2372 2 : }
2373 2 : if w.tableFormat >= TableFormatPebblev4 {
2374 2 : w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
2375 2 : }
2376 :
2377 2 : var buf bytes.Buffer
2378 2 : buf.WriteString("[")
2379 2 : for i := range w.blockPropCollectors {
2380 2 : if i > 0 {
2381 2 : buf.WriteString(",")
2382 2 : }
2383 2 : buf.WriteString(w.blockPropCollectors[i].Name())
2384 : }
2385 2 : buf.WriteString("]")
2386 2 : w.props.PropertyCollectorNames = buf.String()
2387 : }
2388 :
2389 : // Apply the remaining WriterOptions that do not have a preApply() method.
2390 2 : for _, opt := range extraOpts {
2391 2 : if _, ok := opt.(preApply); ok {
2392 2 : continue
2393 : }
2394 0 : opt.writerApply(w)
2395 : }
2396 :
2397 : // Initialize the range key fragmenter and encoder.
2398 2 : w.fragmenter.Emit = w.encodeRangeKeySpan
2399 2 : w.rangeKeyEncoder.Emit = w.addRangeKey
2400 2 : return w
2401 : }
2402 :
2403 : // internalGetProperties is a private, internal-use-only function that takes a
2404 : // Writer and returns a pointer to its Properties, allowing direct mutation.
2405 : // It's used by internal Pebble flushes and compactions to set internal
2406 : // properties. It gets installed in private.
2407 2 : func internalGetProperties(w *Writer) *Properties {
2408 2 : return &w.props
2409 2 : }
2410 :
2411 2 : func init() {
2412 2 : private.SSTableWriterDisableKeyOrderChecks = func(i interface{}) {
2413 1 : w := i.(*Writer)
2414 1 : w.disableKeyOrderChecks = true
2415 1 : }
2416 2 : private.SSTableInternalProperties = internalGetProperties
2417 : }
|