Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "math"
12 : "runtime"
13 : "sort"
14 : "sync"
15 :
16 : "github.com/cespare/xxhash/v2"
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/bytealloc"
20 : "github.com/cockroachdb/pebble/internal/cache"
21 : "github.com/cockroachdb/pebble/internal/crc"
22 : "github.com/cockroachdb/pebble/internal/invariants"
23 : "github.com/cockroachdb/pebble/internal/keyspan"
24 : "github.com/cockroachdb/pebble/internal/private"
25 : "github.com/cockroachdb/pebble/internal/rangekey"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : )
28 :
29 : // encodedBHPEstimatedSize estimates the size of the encoded BlockHandleWithProperties.
30 : // It would also be nice to account for the length of the data block properties here,
31 : // but isn't necessary since this is an estimate.
32 : const encodedBHPEstimatedSize = binary.MaxVarintLen64 * 2
33 :
34 : var errWriterClosed = errors.New("pebble: writer is closed")
35 :
36 : // WriterMetadata holds info about a finished sstable.
37 : type WriterMetadata struct {
38 : Size uint64
39 : SmallestPoint InternalKey
40 : // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
41 : // before Writer.Close is called, because they may only be set on
42 : // Writer.Close.
43 : LargestPoint InternalKey
44 : SmallestRangeDel InternalKey
45 : LargestRangeDel InternalKey
46 : SmallestRangeKey InternalKey
47 : LargestRangeKey InternalKey
48 : HasPointKeys bool
49 : HasRangeDelKeys bool
50 : HasRangeKeys bool
51 : SmallestSeqNum uint64
52 : LargestSeqNum uint64
53 : Properties Properties
54 : }
55 :
56 : // SetSmallestPointKey sets the smallest point key to the given key.
57 : // NB: this method set the "absolute" smallest point key. Any existing key is
58 : // overridden.
59 2 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
60 2 : m.SmallestPoint = k
61 2 : m.HasPointKeys = true
62 2 : }
63 :
64 : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
65 : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
66 : // overridden.
67 2 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
68 2 : m.SmallestRangeDel = k
69 2 : m.HasRangeDelKeys = true
70 2 : }
71 :
72 : // SetSmallestRangeKey sets the smallest range key to the given key.
73 : // NB: this method set the "absolute" smallest range key. Any existing key is
74 : // overridden.
75 2 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
76 2 : m.SmallestRangeKey = k
77 2 : m.HasRangeKeys = true
78 2 : }
79 :
80 : // SetLargestPointKey sets the largest point key to the given key.
81 : // NB: this method set the "absolute" largest point key. Any existing key is
82 : // overridden.
83 2 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
84 2 : m.LargestPoint = k
85 2 : m.HasPointKeys = true
86 2 : }
87 :
88 : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
89 : // NB: this method set the "absolute" largest rangedel key. Any existing key is
90 : // overridden.
91 2 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
92 2 : m.LargestRangeDel = k
93 2 : m.HasRangeDelKeys = true
94 2 : }
95 :
96 : // SetLargestRangeKey sets the largest range key to the given key.
97 : // NB: this method set the "absolute" largest range key. Any existing key is
98 : // overridden.
99 2 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
100 2 : m.LargestRangeKey = k
101 2 : m.HasRangeKeys = true
102 2 : }
103 :
104 2 : func (m *WriterMetadata) updateSeqNum(seqNum uint64) {
105 2 : if m.SmallestSeqNum > seqNum {
106 2 : m.SmallestSeqNum = seqNum
107 2 : }
108 2 : if m.LargestSeqNum < seqNum {
109 2 : m.LargestSeqNum = seqNum
110 2 : }
111 : }
112 :
113 : // Writer is a table writer.
114 : type Writer struct {
115 : writable objstorage.Writable
116 : meta WriterMetadata
117 : err error
118 : // cacheID and fileNum are used to remove blocks written to the sstable from
119 : // the cache, providing a defense in depth against bugs which cause cache
120 : // collisions.
121 : cacheID uint64
122 : fileNum base.DiskFileNum
123 : // The following fields are copied from Options.
124 : blockSize int
125 : blockSizeThreshold int
126 : indexBlockSize int
127 : indexBlockSizeThreshold int
128 : compare Compare
129 : split Split
130 : formatKey base.FormatKey
131 : compression Compression
132 : separator Separator
133 : successor Successor
134 : tableFormat TableFormat
135 : isStrictObsolete bool
136 : writingToLowestLevel bool
137 : cache *cache.Cache
138 : restartInterval int
139 : checksumType ChecksumType
140 : // disableKeyOrderChecks disables the checks that keys are added to an
141 : // sstable in order. It is intended for internal use only in the construction
142 : // of invalid sstables for testing. See tool/make_test_sstables.go.
143 : disableKeyOrderChecks bool
144 : // With two level indexes, the index/filter of a SST file is partitioned into
145 : // smaller blocks with an additional top-level index on them. When reading an
146 : // index/filter, only the top-level index is loaded into memory. The two level
147 : // index/filter then uses the top-level index to load on demand into the block
148 : // cache the partitions that are required to perform the index/filter query.
149 : //
150 : // Two level indexes are enabled automatically when there is more than one
151 : // index block.
152 : //
153 : // This is useful when there are very large index blocks, which generally occurs
154 : // with the usage of large keys. With large index blocks, the index blocks fight
155 : // the data blocks for block cache space and the index blocks are likely to be
156 : // re-read many times from the disk. The top level index, which has a much
157 : // smaller memory footprint, can be used to prevent the entire index block from
158 : // being loaded into the block cache.
159 : twoLevelIndex bool
160 : // Internal flag to allow creation of range-del-v1 format blocks. Only used
161 : // for testing. Note that v2 format blocks are backwards compatible with v1
162 : // format blocks.
163 : rangeDelV1Format bool
164 : indexBlock *indexBlockBuf
165 : rangeDelBlock blockWriter
166 : rangeKeyBlock blockWriter
167 : topLevelIndexBlock blockWriter
168 : props Properties
169 : propCollectors []TablePropertyCollector
170 : blockPropCollectors []BlockPropertyCollector
171 : obsoleteCollector obsoleteKeyBlockPropertyCollector
172 : blockPropsEncoder blockPropertiesEncoder
173 : // filter accumulates the filter block. If populated, the filter ingests
174 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
175 : // nil, or the full keys otherwise.
176 : filter filterWriter
177 : indexPartitions []indexBlockAndBlockProperties
178 :
179 : // indexBlockAlloc is used to bulk-allocate byte slices used to store index
180 : // blocks in indexPartitions. These live until the index finishes.
181 : indexBlockAlloc []byte
182 : // indexSepAlloc is used to bulk-allocate index block separator slices stored
183 : // in indexPartitions. These live until the index finishes.
184 : indexSepAlloc bytealloc.A
185 :
186 : // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
187 : // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
188 : // and values, emitting fragmented, coalesced spans as appropriate. Range
189 : // keys must be added in order of their start user-key.
190 : fragmenter keyspan.Fragmenter
191 : rangeKeyEncoder rangekey.Encoder
192 : rangeKeysBySuffix keyspan.KeysBySuffix
193 : rangeKeySpan keyspan.Span
194 : rkBuf []byte
195 : // dataBlockBuf consists of the state which is currently owned by and used by
196 : // the Writer client goroutine. This state can be handed off to other goroutines.
197 : dataBlockBuf *dataBlockBuf
198 : // blockBuf consists of the state which is owned by and used by the Writer client
199 : // goroutine.
200 : blockBuf blockBuf
201 :
202 : coordination coordinationState
203 :
204 : // Information (other than the byte slice) about the last point key, to
205 : // avoid extracting it again.
206 : lastPointKeyInfo pointKeyInfo
207 :
208 : // For value blocks.
209 : shortAttributeExtractor base.ShortAttributeExtractor
210 : requiredInPlaceValueBound UserKeyPrefixBound
211 : valueBlockWriter *valueBlockWriter
212 : }
213 :
214 : type pointKeyInfo struct {
215 : trailer uint64
216 : // Only computed when w.valueBlockWriter is not nil.
217 : userKeyLen int
218 : // prefixLen uses w.split, if not nil. Only computed when w.valueBlockWriter
219 : // is not nil.
220 : prefixLen int
221 : // True iff the point was marked obsolete.
222 : isObsolete bool
223 : }
224 :
225 : type coordinationState struct {
226 : parallelismEnabled bool
227 :
228 : // writeQueue is used to write data blocks to disk. The writeQueue is primarily
229 : // used to maintain the order in which data blocks must be written to disk. For
230 : // this reason, every single data block write must be done through the writeQueue.
231 : writeQueue *writeQueue
232 :
233 : sizeEstimate dataBlockEstimates
234 : }
235 :
236 2 : func (c *coordinationState) init(parallelismEnabled bool, writer *Writer) {
237 2 : c.parallelismEnabled = parallelismEnabled
238 2 : // useMutex is false regardless of parallelismEnabled, because we do not do
239 2 : // parallel compression yet.
240 2 : c.sizeEstimate.useMutex = false
241 2 :
242 2 : // writeQueueSize determines the size of the write queue, or the number
243 2 : // of items which can be added to the queue without blocking. By default, we
244 2 : // use a writeQueue size of 0, since we won't be doing any block writes in
245 2 : // parallel.
246 2 : writeQueueSize := 0
247 2 : if parallelismEnabled {
248 2 : writeQueueSize = runtime.GOMAXPROCS(0)
249 2 : }
250 2 : c.writeQueue = newWriteQueue(writeQueueSize, writer)
251 : }
252 :
253 : // sizeEstimate is a general purpose helper for estimating two kinds of sizes:
254 : // A. The compressed sstable size, which is useful for deciding when to start
255 : //
256 : // a new sstable during flushes or compactions. In practice, we use this in
257 : // estimating the data size (excluding the index).
258 : //
259 : // B. The size of index blocks to decide when to start a new index block.
260 : //
261 : // There are some terminology peculiarities which are due to the origin of
262 : // sizeEstimate for use case A with parallel compression enabled (for which
263 : // the code has not been merged). Specifically this relates to the terms
264 : // "written" and "compressed".
265 : // - The notion of "written" for case A is sufficiently defined by saying that
266 : // the data block is compressed. Waiting for the actual data block write to
267 : // happen can result in unnecessary estimation, when we already know how big
268 : // it will be in compressed form. Additionally, with the forthcoming value
269 : // blocks containing older MVCC values, these compressed block will be held
270 : // in-memory until late in the sstable writing, and we do want to accurately
271 : // account for them without waiting for the actual write.
272 : // For case B, "written" means that the index entry has been fully
273 : // generated, and has been added to the uncompressed block buffer for that
274 : // index block. It does not include actually writing a potentially
275 : // compressed index block.
276 : // - The notion of "compressed" is to differentiate between a "inflight" size
277 : // and the actual size, and is handled via computing a compression ratio
278 : // observed so far (defaults to 1).
279 : // For case A, this is actual data block compression, so the "inflight" size
280 : // is uncompressed blocks (that are no longer being written to) and the
281 : // "compressed" size is after they have been compressed.
282 : // For case B the inflight size is for a key-value pair in the index for
283 : // which the value size (the encoded size of the BlockHandleWithProperties)
284 : // is not accurately known, while the compressed size is the size of that
285 : // entry when it has been added to the (in-progress) index ssblock.
286 : //
287 : // Usage: To update state, one can optionally provide an inflight write value
288 : // using addInflight (used for case B). When something is "written" the state
289 : // can be updated using either writtenWithDelta or writtenWithTotal, which
290 : // provide the actual delta size or the total size (latter must be
291 : // monotonically non-decreasing). If there were no calls to addInflight, there
292 : // isn't any real estimation happening here. So case A does not do any real
293 : // estimation. However, when we introduce parallel compression, there will be
294 : // estimation in that the client goroutine will call addInFlight and the
295 : // compression goroutines will call writtenWithDelta.
296 : type sizeEstimate struct {
297 : // emptySize is the size when there is no inflight data, and numEntries is 0.
298 : // emptySize is constant once set.
299 : emptySize uint64
300 :
301 : // inflightSize is the estimated size of some inflight data which hasn't
302 : // been written yet.
303 : inflightSize uint64
304 :
305 : // totalSize is the total size of the data which has already been written.
306 : totalSize uint64
307 :
308 : // numWrittenEntries is the total number of entries which have already been
309 : // written.
310 : numWrittenEntries uint64
311 : // numInflightEntries is the total number of entries which are inflight, and
312 : // haven't been written.
313 : numInflightEntries uint64
314 :
315 : // maxEstimatedSize stores the maximum result returned from sizeEstimate.size.
316 : // It ensures that values returned from subsequent calls to Writer.EstimatedSize
317 : // never decrease.
318 : maxEstimatedSize uint64
319 :
320 : // We assume that the entries added to the sizeEstimate can be compressed.
321 : // For this reason, we keep track of a compressedSize and an uncompressedSize
322 : // to compute a compression ratio for the inflight entries. If the entries
323 : // aren't being compressed, then compressedSize and uncompressedSize must be
324 : // equal.
325 : compressedSize uint64
326 : uncompressedSize uint64
327 : }
328 :
329 2 : func (s *sizeEstimate) init(emptySize uint64) {
330 2 : s.emptySize = emptySize
331 2 : }
332 :
333 2 : func (s *sizeEstimate) size() uint64 {
334 2 : ratio := float64(1)
335 2 : if s.uncompressedSize > 0 {
336 2 : ratio = float64(s.compressedSize) / float64(s.uncompressedSize)
337 2 : }
338 2 : estimatedInflightSize := uint64(float64(s.inflightSize) * ratio)
339 2 : total := s.totalSize + estimatedInflightSize
340 2 : if total > s.maxEstimatedSize {
341 2 : s.maxEstimatedSize = total
342 2 : } else {
343 2 : total = s.maxEstimatedSize
344 2 : }
345 :
346 2 : if total == 0 {
347 2 : return s.emptySize
348 2 : }
349 :
350 2 : return total
351 : }
352 :
353 2 : func (s *sizeEstimate) numTotalEntries() uint64 {
354 2 : return s.numWrittenEntries + s.numInflightEntries
355 2 : }
356 :
357 2 : func (s *sizeEstimate) addInflight(size int) {
358 2 : s.numInflightEntries++
359 2 : s.inflightSize += uint64(size)
360 2 : }
361 :
362 2 : func (s *sizeEstimate) writtenWithTotal(newTotalSize uint64, inflightSize int) {
363 2 : finalEntrySize := int(newTotalSize - s.totalSize)
364 2 : s.writtenWithDelta(finalEntrySize, inflightSize)
365 2 : }
366 :
367 2 : func (s *sizeEstimate) writtenWithDelta(finalEntrySize int, inflightSize int) {
368 2 : if inflightSize > 0 {
369 2 : // This entry was previously inflight, so we should decrement inflight
370 2 : // entries and update the "compression" stats for future estimation.
371 2 : s.numInflightEntries--
372 2 : s.inflightSize -= uint64(inflightSize)
373 2 : s.uncompressedSize += uint64(inflightSize)
374 2 : s.compressedSize += uint64(finalEntrySize)
375 2 : }
376 2 : s.numWrittenEntries++
377 2 : s.totalSize += uint64(finalEntrySize)
378 : }
379 :
380 2 : func (s *sizeEstimate) clear() {
381 2 : *s = sizeEstimate{emptySize: s.emptySize}
382 2 : }
383 :
384 : type indexBlockBuf struct {
385 : // block will only be accessed from the writeQueue.
386 : block blockWriter
387 :
388 : size struct {
389 : useMutex bool
390 : mu sync.Mutex
391 : estimate sizeEstimate
392 : }
393 :
394 : // restartInterval matches indexBlockBuf.block.restartInterval. We store it twice, because the `block`
395 : // must only be accessed from the writeQueue goroutine.
396 : restartInterval int
397 : }
398 :
399 2 : func (i *indexBlockBuf) clear() {
400 2 : i.block.clear()
401 2 : if i.size.useMutex {
402 2 : i.size.mu.Lock()
403 2 : defer i.size.mu.Unlock()
404 2 : }
405 2 : i.size.estimate.clear()
406 2 : i.restartInterval = 0
407 : }
408 :
409 : var indexBlockBufPool = sync.Pool{
410 2 : New: func() interface{} {
411 2 : return &indexBlockBuf{}
412 2 : },
413 : }
414 :
415 : const indexBlockRestartInterval = 1
416 :
417 2 : func newIndexBlockBuf(useMutex bool) *indexBlockBuf {
418 2 : i := indexBlockBufPool.Get().(*indexBlockBuf)
419 2 : i.size.useMutex = useMutex
420 2 : i.restartInterval = indexBlockRestartInterval
421 2 : i.block.restartInterval = indexBlockRestartInterval
422 2 : i.size.estimate.init(emptyBlockSize)
423 2 : return i
424 2 : }
425 :
426 : func (i *indexBlockBuf) shouldFlush(
427 : sep InternalKey, valueLen, targetBlockSize, sizeThreshold int,
428 2 : ) bool {
429 2 : if i.size.useMutex {
430 2 : i.size.mu.Lock()
431 2 : defer i.size.mu.Unlock()
432 2 : }
433 :
434 2 : nEntries := i.size.estimate.numTotalEntries()
435 2 : return shouldFlush(
436 2 : sep, valueLen, i.restartInterval, int(i.size.estimate.size()),
437 2 : int(nEntries), targetBlockSize, sizeThreshold)
438 : }
439 :
440 2 : func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) {
441 2 : i.block.add(key, value)
442 2 : size := i.block.estimatedSize()
443 2 : if i.size.useMutex {
444 2 : i.size.mu.Lock()
445 2 : defer i.size.mu.Unlock()
446 2 : }
447 2 : i.size.estimate.writtenWithTotal(uint64(size), inflightSize)
448 : }
449 :
450 2 : func (i *indexBlockBuf) finish() []byte {
451 2 : b := i.block.finish()
452 2 : return b
453 2 : }
454 :
455 2 : func (i *indexBlockBuf) addInflight(inflightSize int) {
456 2 : if i.size.useMutex {
457 2 : i.size.mu.Lock()
458 2 : defer i.size.mu.Unlock()
459 2 : }
460 2 : i.size.estimate.addInflight(inflightSize)
461 : }
462 :
463 2 : func (i *indexBlockBuf) estimatedSize() uint64 {
464 2 : if i.size.useMutex {
465 2 : i.size.mu.Lock()
466 2 : defer i.size.mu.Unlock()
467 2 : }
468 :
469 : // Make sure that the size estimation works as expected when parallelism
470 : // is disabled.
471 2 : if invariants.Enabled && !i.size.useMutex {
472 2 : if i.size.estimate.inflightSize != 0 {
473 0 : panic("unexpected inflight entry in index block size estimation")
474 : }
475 :
476 : // NB: The i.block should only be accessed from the writeQueue goroutine,
477 : // when parallelism is enabled. We break that invariant here, but that's
478 : // okay since parallelism is disabled.
479 2 : if i.size.estimate.size() != uint64(i.block.estimatedSize()) {
480 0 : panic("index block size estimation sans parallelism is incorrect")
481 : }
482 : }
483 2 : return i.size.estimate.size()
484 : }
485 :
486 : // sizeEstimate is used for sstable size estimation. sizeEstimate can be
487 : // accessed by the Writer client and compressionQueue goroutines. Fields
488 : // should only be read/updated through the functions defined on the
489 : // *sizeEstimate type.
490 : type dataBlockEstimates struct {
491 : // If we don't do block compression in parallel, then we don't need to take
492 : // the performance hit of synchronizing using this mutex.
493 : useMutex bool
494 : mu sync.Mutex
495 :
496 : estimate sizeEstimate
497 : }
498 :
499 : // inflightSize is the uncompressed block size estimate which has been
500 : // previously provided to addInflightDataBlock(). If addInflightDataBlock()
501 : // has not been called, this must be set to 0. compressedSize is the
502 : // compressed size of the block.
503 2 : func (d *dataBlockEstimates) dataBlockCompressed(compressedSize int, inflightSize int) {
504 2 : if d.useMutex {
505 0 : d.mu.Lock()
506 0 : defer d.mu.Unlock()
507 0 : }
508 2 : d.estimate.writtenWithDelta(compressedSize+blockTrailerLen, inflightSize)
509 : }
510 :
511 : // size is an estimated size of datablock data which has been written to disk.
512 2 : func (d *dataBlockEstimates) size() uint64 {
513 2 : if d.useMutex {
514 0 : d.mu.Lock()
515 0 : defer d.mu.Unlock()
516 0 : }
517 : // If there is no parallel compression, there should not be any inflight bytes.
518 2 : if invariants.Enabled && !d.useMutex {
519 2 : if d.estimate.inflightSize != 0 {
520 0 : panic("unexpected inflight entry in data block size estimation")
521 : }
522 : }
523 2 : return d.estimate.size()
524 : }
525 :
526 : // Avoid linter unused error.
527 : var _ = (&dataBlockEstimates{}).addInflightDataBlock
528 :
529 : // NB: unused since no parallel compression.
530 0 : func (d *dataBlockEstimates) addInflightDataBlock(size int) {
531 0 : if d.useMutex {
532 0 : d.mu.Lock()
533 0 : defer d.mu.Unlock()
534 0 : }
535 :
536 0 : d.estimate.addInflight(size)
537 : }
538 :
539 : var writeTaskPool = sync.Pool{
540 2 : New: func() interface{} {
541 2 : t := &writeTask{}
542 2 : t.compressionDone = make(chan bool, 1)
543 2 : return t
544 2 : },
545 : }
546 :
547 : type checksummer struct {
548 : checksumType ChecksumType
549 : xxHasher *xxhash.Digest
550 : }
551 :
552 2 : func (c *checksummer) checksum(block []byte, blockType []byte) (checksum uint32) {
553 2 : // Calculate the checksum.
554 2 : switch c.checksumType {
555 2 : case ChecksumTypeCRC32c:
556 2 : checksum = crc.New(block).Update(blockType).Value()
557 1 : case ChecksumTypeXXHash64:
558 1 : if c.xxHasher == nil {
559 1 : c.xxHasher = xxhash.New()
560 1 : } else {
561 1 : c.xxHasher.Reset()
562 1 : }
563 1 : c.xxHasher.Write(block)
564 1 : c.xxHasher.Write(blockType)
565 1 : checksum = uint32(c.xxHasher.Sum64())
566 0 : default:
567 0 : panic(errors.Newf("unsupported checksum type: %d", c.checksumType))
568 : }
569 2 : return checksum
570 : }
571 :
572 : type blockBuf struct {
573 : // tmp is a scratch buffer, large enough to hold either footerLen bytes,
574 : // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most
575 : // likely large enough for a block handle with properties.
576 : tmp [blockHandleLikelyMaxLen]byte
577 : // compressedBuf is the destination buffer for compression. It is re-used over the
578 : // lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block.
579 : compressedBuf []byte
580 : checksummer checksummer
581 : }
582 :
583 2 : func (b *blockBuf) clear() {
584 2 : // We can't assign b.compressedBuf[:0] to compressedBuf because snappy relies
585 2 : // on the length of the buffer, and not the capacity to determine if it needs
586 2 : // to make an allocation.
587 2 : *b = blockBuf{
588 2 : compressedBuf: b.compressedBuf, checksummer: b.checksummer,
589 2 : }
590 2 : }
591 :
592 : // A dataBlockBuf holds all the state required to compress and write a data block to disk.
593 : // A dataBlockBuf begins its lifecycle owned by the Writer client goroutine. The Writer
594 : // client goroutine adds keys to the sstable, writing directly into a dataBlockBuf's blockWriter
595 : // until the block is full. Once a dataBlockBuf's block is full, the dataBlockBuf may be passed
596 : // to other goroutines for compression and file I/O.
597 : type dataBlockBuf struct {
598 : blockBuf
599 : dataBlock blockWriter
600 :
601 : // uncompressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
602 : // next byte slice to be compressed. The uncompressed byte slice will be backed by the
603 : // dataBlock.buf.
604 : uncompressed []byte
605 : // compressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
606 : // compressed byte slice which must be written to disk. The compressed byte slice may be
607 : // backed by the dataBlock.buf, or the dataBlockBuf.compressedBuf, depending on whether
608 : // we use the result of the compression.
609 : compressed []byte
610 :
611 : // We're making calls to BlockPropertyCollectors from the Writer client goroutine. We need to
612 : // pass the encoded block properties over to the write queue. To prevent copies, and allocations,
613 : // we give each dataBlockBuf, a blockPropertiesEncoder.
614 : blockPropsEncoder blockPropertiesEncoder
615 : // dataBlockProps is set when Writer.finishDataBlockProps is called. The dataBlockProps slice is
616 : // a shallow copy of the internal buffer of the dataBlockBuf.blockPropsEncoder.
617 : dataBlockProps []byte
618 :
619 : // sepScratch is reusable scratch space for computing separator keys.
620 : sepScratch []byte
621 : }
622 :
623 2 : func (d *dataBlockBuf) clear() {
624 2 : d.blockBuf.clear()
625 2 : d.dataBlock.clear()
626 2 :
627 2 : d.uncompressed = nil
628 2 : d.compressed = nil
629 2 : d.dataBlockProps = nil
630 2 : d.sepScratch = d.sepScratch[:0]
631 2 : }
632 :
633 : var dataBlockBufPool = sync.Pool{
634 2 : New: func() interface{} {
635 2 : return &dataBlockBuf{}
636 2 : },
637 : }
638 :
639 2 : func newDataBlockBuf(restartInterval int, checksumType ChecksumType) *dataBlockBuf {
640 2 : d := dataBlockBufPool.Get().(*dataBlockBuf)
641 2 : d.dataBlock.restartInterval = restartInterval
642 2 : d.checksummer.checksumType = checksumType
643 2 : return d
644 2 : }
645 :
646 2 : func (d *dataBlockBuf) finish() {
647 2 : d.uncompressed = d.dataBlock.finish()
648 2 : }
649 :
650 2 : func (d *dataBlockBuf) compressAndChecksum(c Compression) {
651 2 : d.compressed = compressAndChecksum(d.uncompressed, c, &d.blockBuf)
652 2 : }
653 :
654 : func (d *dataBlockBuf) shouldFlush(
655 : key InternalKey, valueLen, targetBlockSize, sizeThreshold int,
656 2 : ) bool {
657 2 : return shouldFlush(
658 2 : key, valueLen, d.dataBlock.restartInterval, d.dataBlock.estimatedSize(),
659 2 : d.dataBlock.nEntries, targetBlockSize, sizeThreshold)
660 2 : }
661 :
662 : type indexBlockAndBlockProperties struct {
663 : nEntries int
664 : // sep is the last key added to this block, for computing a separator later.
665 : sep InternalKey
666 : properties []byte
667 : // block is the encoded block produced by blockWriter.finish.
668 : block []byte
669 : }
670 :
671 : // Set sets the value for the given key. The sequence number is set to 0.
672 : // Intended for use to externally construct an sstable before ingestion into a
673 : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
674 : // order.
675 : //
676 : // TODO(peter): untested
677 1 : func (w *Writer) Set(key, value []byte) error {
678 1 : if w.err != nil {
679 0 : return w.err
680 0 : }
681 1 : if w.isStrictObsolete {
682 0 : return errors.Errorf("use AddWithForceObsolete")
683 0 : }
684 : // forceObsolete is false based on the assumption that no RANGEDELs in the
685 : // sstable delete the added points.
686 1 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
687 : }
688 :
689 : // Delete deletes the value for the given key. The sequence number is set to
690 : // 0. Intended for use to externally construct an sstable before ingestion into
691 : // a DB.
692 : //
693 : // TODO(peter): untested
694 1 : func (w *Writer) Delete(key []byte) error {
695 1 : if w.err != nil {
696 0 : return w.err
697 0 : }
698 1 : if w.isStrictObsolete {
699 0 : return errors.Errorf("use AddWithForceObsolete")
700 0 : }
701 : // forceObsolete is false based on the assumption that no RANGEDELs in the
702 : // sstable delete the added points.
703 1 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
704 : }
705 :
706 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
707 : // (inclusive on start, exclusive on end). The sequence number is set to
708 : // 0. Intended for use to externally construct an sstable before ingestion into
709 : // a DB.
710 : //
711 : // TODO(peter): untested
712 2 : func (w *Writer) DeleteRange(start, end []byte) error {
713 2 : if w.err != nil {
714 0 : return w.err
715 0 : }
716 2 : return w.addTombstone(base.MakeInternalKey(start, 0, InternalKeyKindRangeDelete), end)
717 : }
718 :
719 : // Merge adds an action to the DB that merges the value at key with the new
720 : // value. The details of the merge are dependent upon the configured merge
721 : // operator. The sequence number is set to 0. Intended for use to externally
722 : // construct an sstable before ingestion into a DB.
723 : //
724 : // TODO(peter): untested
725 1 : func (w *Writer) Merge(key, value []byte) error {
726 1 : if w.err != nil {
727 0 : return w.err
728 0 : }
729 1 : if w.isStrictObsolete {
730 0 : return errors.Errorf("use AddWithForceObsolete")
731 0 : }
732 : // forceObsolete is false based on the assumption that no RANGEDELs in the
733 : // sstable that delete the added points. If the user configured this writer
734 : // to be strict-obsolete, addPoint will reject the addition of this MERGE.
735 1 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
736 : }
737 :
738 : // Add adds a key/value pair to the table being written. For a given Writer,
739 : // the keys passed to Add must be in increasing order. The exception to this
740 : // rule is range deletion tombstones. Range deletion tombstones need to be
741 : // added ordered by their start key, but they can be added out of order from
742 : // point entries. Additionally, range deletion tombstones must be fragmented
743 : // (i.e. by keyspan.Fragmenter).
744 2 : func (w *Writer) Add(key InternalKey, value []byte) error {
745 2 : if w.isStrictObsolete {
746 0 : return errors.Errorf("use AddWithForceObsolete")
747 0 : }
748 2 : return w.AddWithForceObsolete(key, value, false)
749 : }
750 :
751 : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
752 : //
753 : // forceObsolete indicates whether the caller has determined that this key is
754 : // obsolete even though it may be the latest point key for this userkey. This
755 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
756 : // strict-obsolete sstables.
757 : //
758 : // Note that there are two properties, S1 and S2 (see comment in format.go)
759 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
760 : // responsibility of the caller. S1 is solely the responsibility of the
761 : // callee.
762 2 : func (w *Writer) AddWithForceObsolete(key InternalKey, value []byte, forceObsolete bool) error {
763 2 : if w.err != nil {
764 0 : return w.err
765 0 : }
766 :
767 2 : switch key.Kind() {
768 2 : case InternalKeyKindRangeDelete:
769 2 : return w.addTombstone(key, value)
770 : case base.InternalKeyKindRangeKeyDelete,
771 : base.InternalKeyKindRangeKeySet,
772 0 : base.InternalKeyKindRangeKeyUnset:
773 0 : w.err = errors.Errorf(
774 0 : "pebble: range keys must be added via one of the RangeKey* functions")
775 0 : return w.err
776 : }
777 2 : return w.addPoint(key, value, forceObsolete)
778 : }
779 :
780 2 : func (w *Writer) makeAddPointDecisionV2(key InternalKey) error {
781 2 : prevTrailer := w.lastPointKeyInfo.trailer
782 2 : w.lastPointKeyInfo.trailer = key.Trailer
783 2 : if w.dataBlockBuf.dataBlock.nEntries == 0 {
784 2 : return nil
785 2 : }
786 2 : if !w.disableKeyOrderChecks {
787 2 : prevPointUserKey := w.dataBlockBuf.dataBlock.getCurUserKey()
788 2 : cmpUser := w.compare(prevPointUserKey, key.UserKey)
789 2 : if cmpUser > 0 || (cmpUser == 0 && prevTrailer <= key.Trailer) {
790 1 : return errors.Errorf(
791 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
792 1 : InternalKey{UserKey: prevPointUserKey, Trailer: prevTrailer}.Pretty(w.formatKey),
793 1 : key.Pretty(w.formatKey))
794 1 : }
795 : }
796 2 : return nil
797 : }
798 :
799 : // REQUIRES: at least one point has been written to the Writer.
800 2 : func (w *Writer) getLastPointUserKey() []byte {
801 2 : if w.dataBlockBuf.dataBlock.nEntries == 0 {
802 0 : panic(errors.AssertionFailedf("no point keys added to writer"))
803 : }
804 2 : return w.dataBlockBuf.dataBlock.getCurUserKey()
805 : }
806 :
807 : func (w *Writer) makeAddPointDecisionV3(
808 : key InternalKey, valueLen int,
809 2 : ) (setHasSamePrefix bool, writeToValueBlock bool, isObsolete bool, err error) {
810 2 : prevPointKeyInfo := w.lastPointKeyInfo
811 2 : w.lastPointKeyInfo.userKeyLen = len(key.UserKey)
812 2 : w.lastPointKeyInfo.prefixLen = w.lastPointKeyInfo.userKeyLen
813 2 : if w.split != nil {
814 2 : w.lastPointKeyInfo.prefixLen = w.split(key.UserKey)
815 2 : }
816 2 : w.lastPointKeyInfo.trailer = key.Trailer
817 2 : w.lastPointKeyInfo.isObsolete = false
818 2 : if !w.meta.HasPointKeys {
819 2 : return false, false, false, nil
820 2 : }
821 2 : keyKind := base.TrailerKind(key.Trailer)
822 2 : prevPointUserKey := w.getLastPointUserKey()
823 2 : prevPointKey := InternalKey{UserKey: prevPointUserKey, Trailer: prevPointKeyInfo.trailer}
824 2 : prevKeyKind := base.TrailerKind(prevPointKeyInfo.trailer)
825 2 : considerWriteToValueBlock := prevKeyKind == InternalKeyKindSet &&
826 2 : keyKind == InternalKeyKindSet
827 2 : if considerWriteToValueBlock && !w.requiredInPlaceValueBound.IsEmpty() {
828 1 : keyPrefix := key.UserKey[:w.lastPointKeyInfo.prefixLen]
829 1 : cmpUpper := w.compare(
830 1 : w.requiredInPlaceValueBound.Upper, keyPrefix)
831 1 : if cmpUpper <= 0 {
832 1 : // Common case for CockroachDB. Make it empty since all future keys in
833 1 : // this sstable will also have cmpUpper <= 0.
834 1 : w.requiredInPlaceValueBound = UserKeyPrefixBound{}
835 1 : } else if w.compare(keyPrefix, w.requiredInPlaceValueBound.Lower) >= 0 {
836 1 : considerWriteToValueBlock = false
837 1 : }
838 : }
839 : // cmpPrefix is initialized iff considerWriteToValueBlock.
840 2 : var cmpPrefix int
841 2 : var cmpUser int
842 2 : if considerWriteToValueBlock {
843 2 : // Compare the prefixes.
844 2 : cmpPrefix = w.compare(prevPointUserKey[:prevPointKeyInfo.prefixLen],
845 2 : key.UserKey[:w.lastPointKeyInfo.prefixLen])
846 2 : cmpUser = cmpPrefix
847 2 : if cmpPrefix == 0 {
848 2 : // Need to compare suffixes to compute cmpUser.
849 2 : cmpUser = w.compare(prevPointUserKey[prevPointKeyInfo.prefixLen:],
850 2 : key.UserKey[w.lastPointKeyInfo.prefixLen:])
851 2 : }
852 2 : } else {
853 2 : cmpUser = w.compare(prevPointUserKey, key.UserKey)
854 2 : }
855 : // Ensure that no one adds a point key kind without considering the obsolete
856 : // handling for that kind.
857 2 : switch keyKind {
858 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
859 2 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
860 0 : default:
861 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
862 : }
863 : // If same user key, then the current key is obsolete if any of the
864 : // following is true:
865 : // C1 The prev key was obsolete.
866 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
867 : // preserve SET* and MERGE since their values will be merged into the
868 : // previous key. We also must preserve DEL* since there may be an older
869 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
870 : // if we omit the DEL* that lower SET*/MERGE will become visible.
871 : //
872 : // Regardless of whether it is the same user key or not
873 : // C3 The current key is some kind of point delete, and we are writing to
874 : // the lowest level, then it is also obsolete. The correctness of this
875 : // relies on the same user key not spanning multiple sstables in a level.
876 : //
877 : // C1 ensures that for a user key there is at most one transition from
878 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
879 : // are not obsolete. We consider the various value of n:
880 : //
881 : // n = 0: This happens due to forceObsolete being set by the caller, or due
882 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
883 : // must also delete all the lower seqnums for the same user key. C3 triggers
884 : // due to a point delete and that deletes all the lower seqnums for the same
885 : // user key.
886 : //
887 : // n = 1: This is the common case. It happens when the first key is not a
888 : // MERGE, or the current key is some kind of point delete.
889 : //
890 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
891 : // single non-MERGE key.
892 2 : isObsoleteC1AndC2 := cmpUser == 0 &&
893 2 : (prevPointKeyInfo.isObsolete || prevKeyKind != InternalKeyKindMerge)
894 2 : isObsoleteC3 := w.writingToLowestLevel &&
895 2 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
896 2 : keyKind == InternalKeyKindDeleteSized)
897 2 : isObsolete = isObsoleteC1AndC2 || isObsoleteC3
898 2 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
899 2 : // possible, but requires some care in documenting and checking invariants.
900 2 : // There is code that assumes nothing in value blocks because of single MVCC
901 2 : // version (those should be ok). We have to ensure setHasSamePrefix is
902 2 : // correctly initialized here etc.
903 2 :
904 2 : if !w.disableKeyOrderChecks &&
905 2 : (cmpUser > 0 || (cmpUser == 0 && prevPointKeyInfo.trailer <= key.Trailer)) {
906 1 : return false, false, false, errors.Errorf(
907 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
908 1 : prevPointKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
909 1 : }
910 2 : if !considerWriteToValueBlock {
911 2 : return false, false, isObsolete, nil
912 2 : }
913 : // NB: it is possible that cmpUser == 0, i.e., these two SETs have identical
914 : // user keys (because of an open snapshot). This should be the rare case.
915 2 : setHasSamePrefix = cmpPrefix == 0
916 2 : considerWriteToValueBlock = setHasSamePrefix
917 2 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
918 2 : // valueHandle, this should be > 3. But tiny values are common in test and
919 2 : // unlikely in production, so we use 0 here for better test coverage.
920 2 : const tinyValueThreshold = 0
921 2 : if considerWriteToValueBlock && valueLen <= tinyValueThreshold {
922 2 : considerWriteToValueBlock = false
923 2 : }
924 2 : return setHasSamePrefix, considerWriteToValueBlock, isObsolete, nil
925 : }
926 :
927 2 : func (w *Writer) addPoint(key InternalKey, value []byte, forceObsolete bool) error {
928 2 : if w.isStrictObsolete && key.Kind() == InternalKeyKindMerge {
929 1 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
930 1 : }
931 2 : var err error
932 2 : var setHasSameKeyPrefix, writeToValueBlock, addPrefixToValueStoredWithKey bool
933 2 : var isObsolete bool
934 2 : maxSharedKeyLen := len(key.UserKey)
935 2 : if w.valueBlockWriter != nil {
936 2 : // maxSharedKeyLen is limited to the prefix of the preceding key. If the
937 2 : // preceding key was in a different block, then the blockWriter will
938 2 : // ignore this maxSharedKeyLen.
939 2 : maxSharedKeyLen = w.lastPointKeyInfo.prefixLen
940 2 : setHasSameKeyPrefix, writeToValueBlock, isObsolete, err =
941 2 : w.makeAddPointDecisionV3(key, len(value))
942 2 : addPrefixToValueStoredWithKey = base.TrailerKind(key.Trailer) == InternalKeyKindSet
943 2 : } else {
944 2 : err = w.makeAddPointDecisionV2(key)
945 2 : }
946 2 : if err != nil {
947 1 : return err
948 1 : }
949 2 : isObsolete = w.tableFormat >= TableFormatPebblev4 && (isObsolete || forceObsolete)
950 2 : w.lastPointKeyInfo.isObsolete = isObsolete
951 2 : var valueStoredWithKey []byte
952 2 : var prefix valuePrefix
953 2 : var valueStoredWithKeyLen int
954 2 : if writeToValueBlock {
955 2 : vh, err := w.valueBlockWriter.addValue(value)
956 2 : if err != nil {
957 0 : return err
958 0 : }
959 2 : n := encodeValueHandle(w.blockBuf.tmp[:], vh)
960 2 : valueStoredWithKey = w.blockBuf.tmp[:n]
961 2 : valueStoredWithKeyLen = len(valueStoredWithKey) + 1
962 2 : var attribute base.ShortAttribute
963 2 : if w.shortAttributeExtractor != nil {
964 1 : // TODO(sumeer): for compactions, it is possible that the input sstable
965 1 : // already has this value in the value section and so we have already
966 1 : // extracted the ShortAttribute. Avoid extracting it again. This will
967 1 : // require changing the Writer.Add interface.
968 1 : if attribute, err = w.shortAttributeExtractor(
969 1 : key.UserKey, w.lastPointKeyInfo.prefixLen, value); err != nil {
970 0 : return err
971 0 : }
972 : }
973 2 : prefix = makePrefixForValueHandle(setHasSameKeyPrefix, attribute)
974 2 : } else {
975 2 : valueStoredWithKey = value
976 2 : valueStoredWithKeyLen = len(value)
977 2 : if addPrefixToValueStoredWithKey {
978 2 : valueStoredWithKeyLen++
979 2 : }
980 2 : prefix = makePrefixForInPlaceValue(setHasSameKeyPrefix)
981 : }
982 :
983 2 : if err := w.maybeFlush(key, valueStoredWithKeyLen); err != nil {
984 1 : return err
985 1 : }
986 :
987 2 : for i := range w.propCollectors {
988 1 : if err := w.propCollectors[i].Add(key, value); err != nil {
989 1 : w.err = err
990 1 : return err
991 1 : }
992 : }
993 2 : for i := range w.blockPropCollectors {
994 2 : v := value
995 2 : if addPrefixToValueStoredWithKey {
996 2 : // Values for SET are not required to be in-place, and in the future may
997 2 : // not even be read by the compaction, so pass nil values. Block
998 2 : // property collectors in such Pebble DB's must not look at the value.
999 2 : v = nil
1000 2 : }
1001 2 : if err := w.blockPropCollectors[i].Add(key, v); err != nil {
1002 1 : w.err = err
1003 1 : return err
1004 1 : }
1005 : }
1006 2 : if w.tableFormat >= TableFormatPebblev4 {
1007 2 : w.obsoleteCollector.AddPoint(isObsolete)
1008 2 : }
1009 :
1010 2 : w.maybeAddToFilter(key.UserKey)
1011 2 : w.dataBlockBuf.dataBlock.addWithOptionalValuePrefix(
1012 2 : key, isObsolete, valueStoredWithKey, maxSharedKeyLen, addPrefixToValueStoredWithKey, prefix,
1013 2 : setHasSameKeyPrefix)
1014 2 :
1015 2 : w.meta.updateSeqNum(key.SeqNum())
1016 2 :
1017 2 : if !w.meta.HasPointKeys {
1018 2 : k := w.dataBlockBuf.dataBlock.getCurKey()
1019 2 : // NB: We need to ensure that SmallestPoint.UserKey is set, so we create
1020 2 : // an InternalKey which is semantically identical to the key, but won't
1021 2 : // have a nil UserKey. We do this, because key.UserKey could be nil, and
1022 2 : // we don't want SmallestPoint.UserKey to be nil.
1023 2 : //
1024 2 : // todo(bananabrick): Determine if it's okay to have a nil SmallestPoint
1025 2 : // .UserKey now that we don't rely on a nil UserKey to determine if the
1026 2 : // key has been set or not.
1027 2 : w.meta.SetSmallestPointKey(k.Clone())
1028 2 : }
1029 :
1030 2 : w.props.NumEntries++
1031 2 : switch key.Kind() {
1032 2 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
1033 2 : w.props.NumDeletions++
1034 2 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
1035 2 : case InternalKeyKindDeleteSized:
1036 2 : var size uint64
1037 2 : if len(value) > 0 {
1038 2 : var n int
1039 2 : size, n = binary.Uvarint(value)
1040 2 : if n <= 0 {
1041 0 : w.err = errors.Newf("%s key's value (%x) does not parse as uvarint",
1042 0 : errors.Safe(key.Kind().String()), value)
1043 0 : return w.err
1044 0 : }
1045 : }
1046 2 : w.props.NumDeletions++
1047 2 : w.props.NumSizedDeletions++
1048 2 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
1049 2 : w.props.RawPointTombstoneValueSize += size
1050 2 : case InternalKeyKindMerge:
1051 2 : w.props.NumMergeOperands++
1052 : }
1053 2 : w.props.RawKeySize += uint64(key.Size())
1054 2 : w.props.RawValueSize += uint64(len(value))
1055 2 : return nil
1056 : }
1057 :
1058 1 : func (w *Writer) prettyTombstone(k InternalKey, value []byte) fmt.Formatter {
1059 1 : return keyspan.Span{
1060 1 : Start: k.UserKey,
1061 1 : End: value,
1062 1 : Keys: []keyspan.Key{{Trailer: k.Trailer}},
1063 1 : }.Pretty(w.formatKey)
1064 1 : }
1065 :
1066 2 : func (w *Writer) addTombstone(key InternalKey, value []byte) error {
1067 2 : if !w.disableKeyOrderChecks && !w.rangeDelV1Format && w.rangeDelBlock.nEntries > 0 {
1068 2 : // Check that tombstones are being added in fragmented order. If the two
1069 2 : // tombstones overlap, their start and end keys must be identical.
1070 2 : prevKey := w.rangeDelBlock.getCurKey()
1071 2 : switch c := w.compare(prevKey.UserKey, key.UserKey); {
1072 0 : case c > 0:
1073 0 : w.err = errors.Errorf("pebble: keys must be added in order: %s, %s",
1074 0 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1075 0 : return w.err
1076 2 : case c == 0:
1077 2 : prevValue := w.rangeDelBlock.curValue
1078 2 : if w.compare(prevValue, value) != 0 {
1079 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
1080 1 : w.prettyTombstone(prevKey, prevValue),
1081 1 : w.prettyTombstone(key, value))
1082 1 : return w.err
1083 1 : }
1084 2 : if prevKey.SeqNum() <= key.SeqNum() {
1085 1 : w.err = errors.Errorf("pebble: keys must be added in strictly increasing order: %s, %s",
1086 1 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1087 1 : return w.err
1088 1 : }
1089 2 : default:
1090 2 : prevValue := w.rangeDelBlock.curValue
1091 2 : if w.compare(prevValue, key.UserKey) > 0 {
1092 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
1093 1 : w.prettyTombstone(prevKey, prevValue),
1094 1 : w.prettyTombstone(key, value))
1095 1 : return w.err
1096 1 : }
1097 : }
1098 : }
1099 :
1100 2 : if key.Trailer == InternalKeyRangeDeleteSentinel {
1101 0 : w.err = errors.Errorf("pebble: cannot add range delete sentinel: %s", key.Pretty(w.formatKey))
1102 0 : return w.err
1103 0 : }
1104 :
1105 2 : for i := range w.propCollectors {
1106 1 : if err := w.propCollectors[i].Add(key, value); err != nil {
1107 1 : w.err = err
1108 1 : return err
1109 1 : }
1110 : }
1111 :
1112 2 : w.meta.updateSeqNum(key.SeqNum())
1113 2 :
1114 2 : switch {
1115 1 : case w.rangeDelV1Format:
1116 1 : // Range tombstones are not fragmented in the v1 (i.e. RocksDB) range
1117 1 : // deletion block format, so we need to track the largest range tombstone
1118 1 : // end key as every range tombstone is added.
1119 1 : //
1120 1 : // Note that writing the v1 format is only supported for tests.
1121 1 : if w.props.NumRangeDeletions == 0 {
1122 1 : w.meta.SetSmallestRangeDelKey(key.Clone())
1123 1 : w.meta.SetLargestRangeDelKey(base.MakeRangeDeleteSentinelKey(value).Clone())
1124 1 : } else {
1125 1 : if base.InternalCompare(w.compare, w.meta.SmallestRangeDel, key) > 0 {
1126 1 : w.meta.SetSmallestRangeDelKey(key.Clone())
1127 1 : }
1128 1 : end := base.MakeRangeDeleteSentinelKey(value)
1129 1 : if base.InternalCompare(w.compare, w.meta.LargestRangeDel, end) < 0 {
1130 1 : w.meta.SetLargestRangeDelKey(end.Clone())
1131 1 : }
1132 : }
1133 :
1134 2 : default:
1135 2 : // Range tombstones are fragmented in the v2 range deletion block format,
1136 2 : // so the start key of the first range tombstone added will be the smallest
1137 2 : // range tombstone key. The largest range tombstone key will be determined
1138 2 : // in Writer.Close() as the end key of the last range tombstone added.
1139 2 : if w.props.NumRangeDeletions == 0 {
1140 2 : w.meta.SetSmallestRangeDelKey(key.Clone())
1141 2 : }
1142 : }
1143 :
1144 2 : w.props.NumEntries++
1145 2 : w.props.NumDeletions++
1146 2 : w.props.NumRangeDeletions++
1147 2 : w.props.RawKeySize += uint64(key.Size())
1148 2 : w.props.RawValueSize += uint64(len(value))
1149 2 : w.rangeDelBlock.add(key, value)
1150 2 : return nil
1151 : }
1152 :
1153 : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
1154 : // the given suffix to the given value. The resulting range key is given the
1155 : // sequence number zero, with the expectation that the resulting sstable will be
1156 : // ingested.
1157 : //
1158 : // Keys must be added to the table in increasing order of start key. Spans are
1159 : // not required to be fragmented. The same suffix may not be set or unset twice
1160 : // over the same keyspan, because it would result in inconsistent state. Both
1161 : // the Set and Unset would share the zero sequence number, and a key cannot be
1162 : // both simultaneously set and unset.
1163 1 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
1164 1 : return w.addRangeKeySpan(keyspan.Span{
1165 1 : Start: w.tempRangeKeyCopy(start),
1166 1 : End: w.tempRangeKeyCopy(end),
1167 1 : Keys: []keyspan.Key{
1168 1 : {
1169 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
1170 1 : Suffix: w.tempRangeKeyCopy(suffix),
1171 1 : Value: w.tempRangeKeyCopy(value),
1172 1 : },
1173 1 : },
1174 1 : })
1175 1 : }
1176 :
1177 : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
1178 : // with the given suffix. The resulting range key is given the
1179 : // sequence number zero, with the expectation that the resulting sstable will be
1180 : // ingested.
1181 : //
1182 : // Keys must be added to the table in increasing order of start key. Spans are
1183 : // not required to be fragmented. The same suffix may not be set or unset twice
1184 : // over the same keyspan, because it would result in inconsistent state. Both
1185 : // the Set and Unset would share the zero sequence number, and a key cannot be
1186 : // both simultaneously set and unset.
1187 1 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
1188 1 : return w.addRangeKeySpan(keyspan.Span{
1189 1 : Start: w.tempRangeKeyCopy(start),
1190 1 : End: w.tempRangeKeyCopy(end),
1191 1 : Keys: []keyspan.Key{
1192 1 : {
1193 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
1194 1 : Suffix: w.tempRangeKeyCopy(suffix),
1195 1 : },
1196 1 : },
1197 1 : })
1198 1 : }
1199 :
1200 : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
1201 : //
1202 : // Keys must be added to the table in increasing order of start key. Spans are
1203 : // not required to be fragmented.
1204 2 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
1205 2 : return w.addRangeKeySpan(keyspan.Span{
1206 2 : Start: w.tempRangeKeyCopy(start),
1207 2 : End: w.tempRangeKeyCopy(end),
1208 2 : Keys: []keyspan.Key{
1209 2 : {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
1210 2 : },
1211 2 : })
1212 2 : }
1213 :
1214 : // AddRangeKey adds a range key set, unset, or delete key/value pair to the
1215 : // table being written.
1216 : //
1217 : // Range keys must be supplied in strictly ascending order of start key (i.e.
1218 : // user key ascending, sequence number descending, and key type descending).
1219 : // Ranges added must also be supplied in fragmented span order - i.e. other than
1220 : // spans that are perfectly aligned (same start and end keys), spans may not
1221 : // overlap. Range keys may be added out of order relative to point keys and
1222 : // range deletions.
1223 2 : func (w *Writer) AddRangeKey(key InternalKey, value []byte) error {
1224 2 : if w.err != nil {
1225 0 : return w.err
1226 0 : }
1227 2 : return w.addRangeKey(key, value)
1228 : }
1229 :
1230 2 : func (w *Writer) addRangeKeySpan(span keyspan.Span) error {
1231 2 : if w.compare(span.Start, span.End) >= 0 {
1232 0 : return errors.Errorf(
1233 0 : "pebble: start key must be strictly less than end key",
1234 0 : )
1235 0 : }
1236 2 : if w.fragmenter.Start() != nil && w.compare(w.fragmenter.Start(), span.Start) > 0 {
1237 1 : return errors.Errorf("pebble: spans must be added in order: %s > %s",
1238 1 : w.formatKey(w.fragmenter.Start()), w.formatKey(span.Start))
1239 1 : }
1240 : // Add this span to the fragmenter.
1241 2 : w.fragmenter.Add(span)
1242 2 : return w.err
1243 : }
1244 :
1245 2 : func (w *Writer) encodeRangeKeySpan(span keyspan.Span) {
1246 2 : // This method is the emit function of the Fragmenter.
1247 2 : //
1248 2 : // NB: The span should only contain range keys and be internally consistent
1249 2 : // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
1250 2 : //
1251 2 : // We use w.rangeKeysBySuffix and w.rangeKeySpan to avoid allocations.
1252 2 :
1253 2 : // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
1254 2 : // we may want to in the future.
1255 2 : w.rangeKeysBySuffix.Cmp = w.compare
1256 2 : w.rangeKeysBySuffix.Keys = span.Keys
1257 2 : sort.Sort(&w.rangeKeysBySuffix)
1258 2 :
1259 2 : w.rangeKeySpan = span
1260 2 : w.rangeKeySpan.Keys = w.rangeKeysBySuffix.Keys
1261 2 : w.err = firstError(w.err, w.rangeKeyEncoder.Encode(&w.rangeKeySpan))
1262 2 : }
1263 :
1264 2 : func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
1265 2 : if !w.disableKeyOrderChecks && w.rangeKeyBlock.nEntries > 0 {
1266 2 : prevStartKey := w.rangeKeyBlock.getCurKey()
1267 2 : prevEndKey, _, ok := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue)
1268 2 : if !ok {
1269 0 : // We panic here as we should have previously decoded and validated this
1270 0 : // key and value when it was first added to the range key block.
1271 0 : panic(errors.Errorf("pebble: invalid end key for span: %s",
1272 0 : prevStartKey.Pretty(w.formatKey)))
1273 : }
1274 :
1275 2 : curStartKey := key
1276 2 : curEndKey, _, ok := rangekey.DecodeEndKey(curStartKey.Kind(), value)
1277 2 : if !ok {
1278 0 : w.err = errors.Errorf("pebble: invalid end key for span: %s",
1279 0 : curStartKey.Pretty(w.formatKey))
1280 0 : return w.err
1281 0 : }
1282 :
1283 : // Start keys must be strictly increasing.
1284 2 : if base.InternalCompare(w.compare, prevStartKey, curStartKey) >= 0 {
1285 1 : w.err = errors.Errorf(
1286 1 : "pebble: range keys starts must be added in increasing order: %s, %s",
1287 1 : prevStartKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1288 1 : return w.err
1289 1 : }
1290 :
1291 : // Start keys are increasing. If the start user keys are equal, the
1292 : // end keys must be equal (i.e. aligned spans).
1293 2 : if w.compare(prevStartKey.UserKey, curStartKey.UserKey) == 0 {
1294 2 : if w.compare(prevEndKey, curEndKey) != 0 {
1295 1 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
1296 1 : prevStartKey.Pretty(w.formatKey),
1297 1 : curStartKey.Pretty(w.formatKey))
1298 1 : return w.err
1299 1 : }
1300 2 : } else if w.compare(prevEndKey, curStartKey.UserKey) > 0 {
1301 1 : // If the start user keys are NOT equal, the spans must be disjoint (i.e.
1302 1 : // no overlap).
1303 1 : // NOTE: the inequality excludes zero, as we allow the end key of the
1304 1 : // lower span be the same as the start key of the upper span, because
1305 1 : // the range end key is considered an exclusive bound.
1306 1 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
1307 1 : prevStartKey.Pretty(w.formatKey),
1308 1 : curStartKey.Pretty(w.formatKey))
1309 1 : return w.err
1310 1 : }
1311 : }
1312 :
1313 : // TODO(travers): Add an invariant-gated check to ensure that suffix-values
1314 : // are sorted within coalesced spans.
1315 :
1316 : // Range-keys and point-keys are intended to live in "parallel" keyspaces.
1317 : // However, we track a single seqnum in the table metadata that spans both of
1318 : // these keyspaces.
1319 : // TODO(travers): Consider tracking range key seqnums separately.
1320 2 : w.meta.updateSeqNum(key.SeqNum())
1321 2 :
1322 2 : // Range tombstones are fragmented, so the start key of the first range key
1323 2 : // added will be the smallest. The largest range key is determined in
1324 2 : // Writer.Close() as the end key of the last range key added to the block.
1325 2 : if w.props.NumRangeKeys() == 0 {
1326 2 : w.meta.SetSmallestRangeKey(key.Clone())
1327 2 : }
1328 :
1329 : // Update block properties.
1330 2 : w.props.RawRangeKeyKeySize += uint64(key.Size())
1331 2 : w.props.RawRangeKeyValueSize += uint64(len(value))
1332 2 : switch key.Kind() {
1333 2 : case base.InternalKeyKindRangeKeyDelete:
1334 2 : w.props.NumRangeKeyDels++
1335 2 : case base.InternalKeyKindRangeKeySet:
1336 2 : w.props.NumRangeKeySets++
1337 2 : case base.InternalKeyKindRangeKeyUnset:
1338 2 : w.props.NumRangeKeyUnsets++
1339 0 : default:
1340 0 : panic(errors.Errorf("pebble: invalid range key type: %s", key.Kind()))
1341 : }
1342 :
1343 2 : for i := range w.blockPropCollectors {
1344 2 : if err := w.blockPropCollectors[i].Add(key, value); err != nil {
1345 0 : return err
1346 0 : }
1347 : }
1348 :
1349 : // Add the key to the block.
1350 2 : w.rangeKeyBlock.add(key, value)
1351 2 : return nil
1352 : }
1353 :
1354 : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
1355 : // slice. Any byte written to the returned slice is retained for the lifetime of
1356 : // the Writer.
1357 2 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
1358 2 : if cap(w.rkBuf)-len(w.rkBuf) < n {
1359 2 : size := len(w.rkBuf) + 2*n
1360 2 : if size < 2*cap(w.rkBuf) {
1361 2 : size = 2 * cap(w.rkBuf)
1362 2 : }
1363 2 : buf := make([]byte, len(w.rkBuf), size)
1364 2 : copy(buf, w.rkBuf)
1365 2 : w.rkBuf = buf
1366 : }
1367 2 : b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
1368 2 : w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
1369 2 : return b
1370 : }
1371 :
1372 : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
1373 : // range key buffer.
1374 2 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
1375 2 : if len(k) == 0 {
1376 1 : return nil
1377 1 : }
1378 2 : buf := w.tempRangeKeyBuf(len(k))
1379 2 : copy(buf, k)
1380 2 : return buf
1381 : }
1382 :
1383 2 : func (w *Writer) maybeAddToFilter(key []byte) {
1384 2 : if w.filter != nil {
1385 2 : if w.split != nil {
1386 2 : prefix := key[:w.split(key)]
1387 2 : w.filter.addKey(prefix)
1388 2 : } else {
1389 1 : w.filter.addKey(key)
1390 1 : }
1391 : }
1392 : }
1393 :
1394 2 : func (w *Writer) flush(key InternalKey) error {
1395 2 : // We're finishing a data block.
1396 2 : err := w.finishDataBlockProps(w.dataBlockBuf)
1397 2 : if err != nil {
1398 1 : return err
1399 1 : }
1400 2 : w.dataBlockBuf.finish()
1401 2 : w.dataBlockBuf.compressAndChecksum(w.compression)
1402 2 : // Since dataBlockEstimates.addInflightDataBlock was never called, the
1403 2 : // inflightSize is set to 0.
1404 2 : w.coordination.sizeEstimate.dataBlockCompressed(len(w.dataBlockBuf.compressed), 0)
1405 2 :
1406 2 : // Determine if the index block should be flushed. Since we're accessing the
1407 2 : // dataBlockBuf.dataBlock.curKey here, we have to make sure that once we start
1408 2 : // to pool the dataBlockBufs, the curKey isn't used by the Writer once the
1409 2 : // dataBlockBuf is added back to a sync.Pool. In this particular case, the
1410 2 : // byte slice which supports "sep" will eventually be copied when "sep" is
1411 2 : // added to the index block.
1412 2 : prevKey := w.dataBlockBuf.dataBlock.getCurKey()
1413 2 : sep := w.indexEntrySep(prevKey, key, w.dataBlockBuf)
1414 2 : // We determine that we should flush an index block from the Writer client
1415 2 : // goroutine, but we actually finish the index block from the writeQueue.
1416 2 : // When we determine that an index block should be flushed, we need to call
1417 2 : // BlockPropertyCollector.FinishIndexBlock. But block property collector
1418 2 : // calls must happen sequentially from the Writer client. Therefore, we need
1419 2 : // to determine that we are going to flush the index block from the Writer
1420 2 : // client.
1421 2 : shouldFlushIndexBlock := supportsTwoLevelIndex(w.tableFormat) && w.indexBlock.shouldFlush(
1422 2 : sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold,
1423 2 : )
1424 2 :
1425 2 : var indexProps []byte
1426 2 : var flushableIndexBlock *indexBlockBuf
1427 2 : if shouldFlushIndexBlock {
1428 2 : flushableIndexBlock = w.indexBlock
1429 2 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1430 2 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1431 2 : // flush the index block.
1432 2 : indexProps, err = w.finishIndexBlockProps()
1433 2 : if err != nil {
1434 1 : return err
1435 1 : }
1436 : }
1437 :
1438 : // We've called BlockPropertyCollector.FinishDataBlock, and, if necessary,
1439 : // BlockPropertyCollector.FinishIndexBlock. Since we've decided to finish
1440 : // the data block, we can call
1441 : // BlockPropertyCollector.AddPrevDataBlockToIndexBlock.
1442 2 : w.addPrevDataBlockToIndexBlockProps()
1443 2 :
1444 2 : // Schedule a write.
1445 2 : writeTask := writeTaskPool.Get().(*writeTask)
1446 2 : // We're setting compressionDone to indicate that compression of this block
1447 2 : // has already been completed.
1448 2 : writeTask.compressionDone <- true
1449 2 : writeTask.buf = w.dataBlockBuf
1450 2 : writeTask.indexEntrySep = sep
1451 2 : writeTask.currIndexBlock = w.indexBlock
1452 2 : writeTask.indexInflightSize = sep.Size() + encodedBHPEstimatedSize
1453 2 : writeTask.finishedIndexProps = indexProps
1454 2 : writeTask.flushableIndexBlock = flushableIndexBlock
1455 2 :
1456 2 : // The writeTask corresponds to an unwritten index entry.
1457 2 : w.indexBlock.addInflight(writeTask.indexInflightSize)
1458 2 :
1459 2 : w.dataBlockBuf = nil
1460 2 : if w.coordination.parallelismEnabled {
1461 2 : w.coordination.writeQueue.add(writeTask)
1462 2 : } else {
1463 2 : err = w.coordination.writeQueue.addSync(writeTask)
1464 2 : }
1465 2 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
1466 2 :
1467 2 : return err
1468 : }
1469 :
1470 2 : func (w *Writer) maybeFlush(key InternalKey, valueLen int) error {
1471 2 : if !w.dataBlockBuf.shouldFlush(key, valueLen, w.blockSize, w.blockSizeThreshold) {
1472 2 : return nil
1473 2 : }
1474 :
1475 2 : err := w.flush(key)
1476 2 :
1477 2 : if err != nil {
1478 1 : w.err = err
1479 1 : return err
1480 1 : }
1481 :
1482 2 : return nil
1483 : }
1484 :
1485 : // dataBlockBuf.dataBlockProps set by this method must be encoded before any future use of the
1486 : // dataBlockBuf.blockPropsEncoder, since the properties slice will get reused by the
1487 : // blockPropsEncoder.
1488 2 : func (w *Writer) finishDataBlockProps(buf *dataBlockBuf) error {
1489 2 : if len(w.blockPropCollectors) == 0 {
1490 2 : return nil
1491 2 : }
1492 2 : var err error
1493 2 : buf.blockPropsEncoder.resetProps()
1494 2 : for i := range w.blockPropCollectors {
1495 2 : scratch := buf.blockPropsEncoder.getScratchForProp()
1496 2 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
1497 1 : return err
1498 1 : }
1499 2 : if len(scratch) > 0 {
1500 2 : buf.blockPropsEncoder.addProp(shortID(i), scratch)
1501 2 : }
1502 : }
1503 :
1504 2 : buf.dataBlockProps = buf.blockPropsEncoder.unsafeProps()
1505 2 : return nil
1506 : }
1507 :
1508 : // The BlockHandleWithProperties returned by this method must be encoded before any future use of
1509 : // the Writer.blockPropsEncoder, since the properties slice will get reused by the blockPropsEncoder.
1510 : // maybeAddBlockPropertiesToBlockHandle should only be called if block is being written synchronously
1511 : // with the Writer client.
1512 : func (w *Writer) maybeAddBlockPropertiesToBlockHandle(
1513 : bh BlockHandle,
1514 2 : ) (BlockHandleWithProperties, error) {
1515 2 : err := w.finishDataBlockProps(w.dataBlockBuf)
1516 2 : if err != nil {
1517 0 : return BlockHandleWithProperties{}, err
1518 0 : }
1519 2 : return BlockHandleWithProperties{BlockHandle: bh, Props: w.dataBlockBuf.dataBlockProps}, nil
1520 : }
1521 :
1522 2 : func (w *Writer) indexEntrySep(prevKey, key InternalKey, dataBlockBuf *dataBlockBuf) InternalKey {
1523 2 : // Make a rough guess that we want key-sized scratch to compute the separator.
1524 2 : if cap(dataBlockBuf.sepScratch) < key.Size() {
1525 2 : dataBlockBuf.sepScratch = make([]byte, 0, key.Size()*2)
1526 2 : }
1527 :
1528 2 : var sep InternalKey
1529 2 : if key.UserKey == nil && key.Trailer == 0 {
1530 2 : sep = prevKey.Successor(w.compare, w.successor, dataBlockBuf.sepScratch[:0])
1531 2 : } else {
1532 2 : sep = prevKey.Separator(w.compare, w.separator, dataBlockBuf.sepScratch[:0], key)
1533 2 : }
1534 2 : return sep
1535 : }
1536 :
1537 : // addIndexEntry adds an index entry for the specified key and block handle.
1538 : // addIndexEntry can be called from both the Writer client goroutine, and the
1539 : // writeQueue goroutine. If the flushIndexBuf != nil, then the indexProps, as
1540 : // they're used when the index block is finished.
1541 : //
1542 : // Invariant:
1543 : // 1. addIndexEntry must not store references to the sep InternalKey, the tmp
1544 : // byte slice, bhp.Props. That is, these must be either deep copied or
1545 : // encoded.
1546 : // 2. addIndexEntry must not hold references to the flushIndexBuf, and the writeTo
1547 : // indexBlockBufs.
1548 : func (w *Writer) addIndexEntry(
1549 : sep InternalKey,
1550 : bhp BlockHandleWithProperties,
1551 : tmp []byte,
1552 : flushIndexBuf *indexBlockBuf,
1553 : writeTo *indexBlockBuf,
1554 : inflightSize int,
1555 : indexProps []byte,
1556 2 : ) error {
1557 2 : if bhp.Length == 0 {
1558 0 : // A valid blockHandle must be non-zero.
1559 0 : // In particular, it must have a non-zero length.
1560 0 : return nil
1561 0 : }
1562 :
1563 2 : encoded := encodeBlockHandleWithProperties(tmp, bhp)
1564 2 :
1565 2 : if flushIndexBuf != nil {
1566 2 : if cap(w.indexPartitions) == 0 {
1567 2 : w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
1568 2 : }
1569 : // Enable two level indexes if there is more than one index block.
1570 2 : w.twoLevelIndex = true
1571 2 : if err := w.finishIndexBlock(flushIndexBuf, indexProps); err != nil {
1572 0 : return err
1573 0 : }
1574 : }
1575 :
1576 2 : writeTo.add(sep, encoded, inflightSize)
1577 2 : return nil
1578 : }
1579 :
1580 2 : func (w *Writer) addPrevDataBlockToIndexBlockProps() {
1581 2 : for i := range w.blockPropCollectors {
1582 2 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
1583 2 : }
1584 : }
1585 :
1586 : // addIndexEntrySync adds an index entry for the specified key and block handle.
1587 : // Writer.addIndexEntry is only called synchronously once Writer.Close is called.
1588 : // addIndexEntrySync should only be called if we're sure that index entries
1589 : // aren't being written asynchronously.
1590 : //
1591 : // Invariant:
1592 : // 1. addIndexEntrySync must not store references to the prevKey, key InternalKey's,
1593 : // the tmp byte slice. That is, these must be either deep copied or encoded.
1594 : func (w *Writer) addIndexEntrySync(
1595 : prevKey, key InternalKey, bhp BlockHandleWithProperties, tmp []byte,
1596 2 : ) error {
1597 2 : sep := w.indexEntrySep(prevKey, key, w.dataBlockBuf)
1598 2 : shouldFlush := supportsTwoLevelIndex(
1599 2 : w.tableFormat) && w.indexBlock.shouldFlush(
1600 2 : sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold,
1601 2 : )
1602 2 : var flushableIndexBlock *indexBlockBuf
1603 2 : var props []byte
1604 2 : var err error
1605 2 : if shouldFlush {
1606 2 : flushableIndexBlock = w.indexBlock
1607 2 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1608 2 :
1609 2 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1610 2 : // flush the index block.
1611 2 : props, err = w.finishIndexBlockProps()
1612 2 : if err != nil {
1613 0 : return err
1614 0 : }
1615 : }
1616 :
1617 2 : err = w.addIndexEntry(sep, bhp, tmp, flushableIndexBlock, w.indexBlock, 0, props)
1618 2 : if flushableIndexBlock != nil {
1619 2 : flushableIndexBlock.clear()
1620 2 : indexBlockBufPool.Put(flushableIndexBlock)
1621 2 : }
1622 2 : w.addPrevDataBlockToIndexBlockProps()
1623 2 : return err
1624 : }
1625 :
1626 : func shouldFlush(
1627 : key InternalKey,
1628 : valueLen int,
1629 : restartInterval, estimatedBlockSize, numEntries, targetBlockSize, sizeThreshold int,
1630 2 : ) bool {
1631 2 : if numEntries == 0 {
1632 2 : return false
1633 2 : }
1634 :
1635 2 : if estimatedBlockSize >= targetBlockSize {
1636 2 : return true
1637 2 : }
1638 :
1639 : // The block is currently smaller than the target size.
1640 2 : if estimatedBlockSize <= sizeThreshold {
1641 2 : // The block is smaller than the threshold size at which we'll consider
1642 2 : // flushing it.
1643 2 : return false
1644 2 : }
1645 :
1646 2 : newSize := estimatedBlockSize + key.Size() + valueLen
1647 2 : if numEntries%restartInterval == 0 {
1648 2 : newSize += 4
1649 2 : }
1650 2 : newSize += 4 // varint for shared prefix length
1651 2 : newSize += uvarintLen(uint32(key.Size())) // varint for unshared key bytes
1652 2 : newSize += uvarintLen(uint32(valueLen)) // varint for value size
1653 2 : // Flush if the block plus the new entry is larger than the target size.
1654 2 : return newSize > targetBlockSize
1655 : }
1656 :
1657 2 : func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) {
1658 2 : if len(k.UserKey) == 0 {
1659 0 : return a, k
1660 0 : }
1661 2 : a, keyCopy := a.Copy(k.UserKey)
1662 2 : return a, InternalKey{UserKey: keyCopy, Trailer: k.Trailer}
1663 : }
1664 :
1665 : // Invariants: The byte slice returned by finishIndexBlockProps is heap-allocated
1666 : //
1667 : // and has its own lifetime, independent of the Writer and the blockPropsEncoder,
1668 : //
1669 : // and it is safe to:
1670 : // 1. Reuse w.blockPropsEncoder without first encoding the byte slice returned.
1671 : // 2. Store the byte slice in the Writer since it is a copy and not supported by
1672 : // an underlying buffer.
1673 2 : func (w *Writer) finishIndexBlockProps() ([]byte, error) {
1674 2 : w.blockPropsEncoder.resetProps()
1675 2 : for i := range w.blockPropCollectors {
1676 2 : scratch := w.blockPropsEncoder.getScratchForProp()
1677 2 : var err error
1678 2 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
1679 1 : return nil, err
1680 1 : }
1681 2 : if len(scratch) > 0 {
1682 2 : w.blockPropsEncoder.addProp(shortID(i), scratch)
1683 2 : }
1684 : }
1685 2 : return w.blockPropsEncoder.props(), nil
1686 : }
1687 :
1688 : // finishIndexBlock finishes the current index block and adds it to the top
1689 : // level index block. This is only used when two level indexes are enabled.
1690 : //
1691 : // Invariants:
1692 : // 1. The props slice passed into finishedIndexBlock must not be a
1693 : // owned by any other struct, since it will be stored in the Writer.indexPartitions
1694 : // slice.
1695 : // 2. None of the buffers owned by indexBuf will be shallow copied and stored elsewhere.
1696 : // That is, it must be safe to reuse indexBuf after finishIndexBlock has been called.
1697 2 : func (w *Writer) finishIndexBlock(indexBuf *indexBlockBuf, props []byte) error {
1698 2 : part := indexBlockAndBlockProperties{
1699 2 : nEntries: indexBuf.block.nEntries, properties: props,
1700 2 : }
1701 2 : w.indexSepAlloc, part.sep = cloneKeyWithBuf(
1702 2 : indexBuf.block.getCurKey(), w.indexSepAlloc,
1703 2 : )
1704 2 : bk := indexBuf.finish()
1705 2 : if len(w.indexBlockAlloc) < len(bk) {
1706 2 : // Allocate enough bytes for approximately 16 index blocks.
1707 2 : w.indexBlockAlloc = make([]byte, len(bk)*16)
1708 2 : }
1709 2 : n := copy(w.indexBlockAlloc, bk)
1710 2 : part.block = w.indexBlockAlloc[:n:n]
1711 2 : w.indexBlockAlloc = w.indexBlockAlloc[n:]
1712 2 : w.indexPartitions = append(w.indexPartitions, part)
1713 2 : return nil
1714 : }
1715 :
1716 2 : func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {
1717 2 : props, err := w.finishIndexBlockProps()
1718 2 : if err != nil {
1719 0 : return BlockHandle{}, err
1720 0 : }
1721 : // Add the final unfinished index.
1722 2 : if err = w.finishIndexBlock(w.indexBlock, props); err != nil {
1723 0 : return BlockHandle{}, err
1724 0 : }
1725 :
1726 2 : for i := range w.indexPartitions {
1727 2 : b := &w.indexPartitions[i]
1728 2 : w.props.NumDataBlocks += uint64(b.nEntries)
1729 2 :
1730 2 : data := b.block
1731 2 : w.props.IndexSize += uint64(len(data))
1732 2 : bh, err := w.writeBlock(data, w.compression, &w.blockBuf)
1733 2 : if err != nil {
1734 0 : return BlockHandle{}, err
1735 0 : }
1736 2 : bhp := BlockHandleWithProperties{
1737 2 : BlockHandle: bh,
1738 2 : Props: b.properties,
1739 2 : }
1740 2 : encoded := encodeBlockHandleWithProperties(w.blockBuf.tmp[:], bhp)
1741 2 : w.topLevelIndexBlock.add(b.sep, encoded)
1742 : }
1743 :
1744 : // NB: RocksDB includes the block trailer length in the index size
1745 : // property, though it doesn't include the trailer in the top level
1746 : // index size property.
1747 2 : w.props.IndexPartitions = uint64(len(w.indexPartitions))
1748 2 : w.props.TopLevelIndexSize = uint64(w.topLevelIndexBlock.estimatedSize())
1749 2 : w.props.IndexSize += w.props.TopLevelIndexSize + blockTrailerLen
1750 2 :
1751 2 : return w.writeBlock(w.topLevelIndexBlock.finish(), w.compression, &w.blockBuf)
1752 : }
1753 :
1754 2 : func compressAndChecksum(b []byte, compression Compression, blockBuf *blockBuf) []byte {
1755 2 : // Compress the buffer, discarding the result if the improvement isn't at
1756 2 : // least 12.5%.
1757 2 : blockType, compressed := compressBlock(compression, b, blockBuf.compressedBuf)
1758 2 : if blockType != noCompressionBlockType && cap(compressed) > cap(blockBuf.compressedBuf) {
1759 2 : blockBuf.compressedBuf = compressed[:cap(compressed)]
1760 2 : }
1761 2 : if len(compressed) < len(b)-len(b)/8 {
1762 2 : b = compressed
1763 2 : } else {
1764 2 : blockType = noCompressionBlockType
1765 2 : }
1766 :
1767 2 : blockBuf.tmp[0] = byte(blockType)
1768 2 :
1769 2 : // Calculate the checksum.
1770 2 : checksum := blockBuf.checksummer.checksum(b, blockBuf.tmp[:1])
1771 2 : binary.LittleEndian.PutUint32(blockBuf.tmp[1:5], checksum)
1772 2 : return b
1773 : }
1774 :
1775 2 : func (w *Writer) writeCompressedBlock(block []byte, blockTrailerBuf []byte) (BlockHandle, error) {
1776 2 : bh := BlockHandle{Offset: w.meta.Size, Length: uint64(len(block))}
1777 2 :
1778 2 : if w.cacheID != 0 && w.fileNum.FileNum() != 0 {
1779 2 : // Remove the block being written from the cache. This provides defense in
1780 2 : // depth against bugs which cause cache collisions.
1781 2 : //
1782 2 : // TODO(peter): Alternatively, we could add the uncompressed value to the
1783 2 : // cache.
1784 2 : w.cache.Delete(w.cacheID, w.fileNum, bh.Offset)
1785 2 : }
1786 :
1787 : // Write the bytes to the file.
1788 2 : if err := w.writable.Write(block); err != nil {
1789 0 : return BlockHandle{}, err
1790 0 : }
1791 2 : w.meta.Size += uint64(len(block))
1792 2 : if err := w.writable.Write(blockTrailerBuf[:blockTrailerLen]); err != nil {
1793 0 : return BlockHandle{}, err
1794 0 : }
1795 2 : w.meta.Size += blockTrailerLen
1796 2 :
1797 2 : return bh, nil
1798 : }
1799 :
1800 : // Write implements io.Writer. This is analogous to writeCompressedBlock for
1801 : // blocks that already incorporate the trailer, and don't need the callee to
1802 : // return a BlockHandle.
1803 2 : func (w *Writer) Write(blockWithTrailer []byte) (n int, err error) {
1804 2 : offset := w.meta.Size
1805 2 : if w.cacheID != 0 && w.fileNum.FileNum() != 0 {
1806 2 : // Remove the block being written from the cache. This provides defense in
1807 2 : // depth against bugs which cause cache collisions.
1808 2 : //
1809 2 : // TODO(peter): Alternatively, we could add the uncompressed value to the
1810 2 : // cache.
1811 2 : w.cache.Delete(w.cacheID, w.fileNum, offset)
1812 2 : }
1813 2 : w.meta.Size += uint64(len(blockWithTrailer))
1814 2 : if err := w.writable.Write(blockWithTrailer); err != nil {
1815 0 : return 0, err
1816 0 : }
1817 2 : return len(blockWithTrailer), nil
1818 : }
1819 :
1820 : func (w *Writer) writeBlock(
1821 : b []byte, compression Compression, blockBuf *blockBuf,
1822 2 : ) (BlockHandle, error) {
1823 2 : b = compressAndChecksum(b, compression, blockBuf)
1824 2 : return w.writeCompressedBlock(b, blockBuf.tmp[:])
1825 2 : }
1826 :
1827 : // assertFormatCompatibility ensures that the features present on the table are
1828 : // compatible with the table format version.
1829 2 : func (w *Writer) assertFormatCompatibility() error {
1830 2 : // PebbleDBv1: block properties.
1831 2 : if len(w.blockPropCollectors) > 0 && w.tableFormat < TableFormatPebblev1 {
1832 1 : return errors.Newf(
1833 1 : "table format version %s is less than the minimum required version %s for block properties",
1834 1 : w.tableFormat, TableFormatPebblev1,
1835 1 : )
1836 1 : }
1837 :
1838 : // PebbleDBv2: range keys.
1839 2 : if w.props.NumRangeKeys() > 0 && w.tableFormat < TableFormatPebblev2 {
1840 1 : return errors.Newf(
1841 1 : "table format version %s is less than the minimum required version %s for range keys",
1842 1 : w.tableFormat, TableFormatPebblev2,
1843 1 : )
1844 1 : }
1845 :
1846 : // PebbleDBv3: value blocks.
1847 2 : if (w.props.NumValueBlocks > 0 || w.props.NumValuesInValueBlocks > 0 ||
1848 2 : w.props.ValueBlocksSize > 0) && w.tableFormat < TableFormatPebblev3 {
1849 0 : return errors.Newf(
1850 0 : "table format version %s is less than the minimum required version %s for value blocks",
1851 0 : w.tableFormat, TableFormatPebblev3)
1852 0 : }
1853 :
1854 : // PebbleDBv4: DELSIZED tombstones.
1855 2 : if w.props.NumSizedDeletions > 0 && w.tableFormat < TableFormatPebblev4 {
1856 0 : return errors.Newf(
1857 0 : "table format version %s is less than the minimum required version %s for sized deletion tombstones",
1858 0 : w.tableFormat, TableFormatPebblev4)
1859 0 : }
1860 2 : return nil
1861 : }
1862 :
1863 : // Close finishes writing the table and closes the underlying file that the
1864 : // table was written to.
1865 2 : func (w *Writer) Close() (err error) {
1866 2 : defer func() {
1867 2 : if w.valueBlockWriter != nil {
1868 2 : releaseValueBlockWriter(w.valueBlockWriter)
1869 2 : // Defensive code in case Close gets called again. We don't want to put
1870 2 : // the same object to a sync.Pool.
1871 2 : w.valueBlockWriter = nil
1872 2 : }
1873 2 : if w.writable != nil {
1874 1 : w.writable.Abort()
1875 1 : w.writable = nil
1876 1 : }
1877 : // Record any error in the writer (so we can exit early if Close is called
1878 : // again).
1879 2 : if err != nil {
1880 1 : w.err = err
1881 1 : }
1882 : }()
1883 :
1884 : // finish must be called before we check for an error, because finish will
1885 : // block until every single task added to the writeQueue has been processed,
1886 : // and an error could be encountered while any of those tasks are processed.
1887 2 : if err := w.coordination.writeQueue.finish(); err != nil {
1888 1 : return err
1889 1 : }
1890 :
1891 2 : if w.err != nil {
1892 1 : return w.err
1893 1 : }
1894 :
1895 : // The w.meta.LargestPointKey is only used once the Writer is closed, so it is safe to set it
1896 : // when the Writer is closed.
1897 : //
1898 : // The following invariants ensure that setting the largest key at this point of a Writer close
1899 : // is correct:
1900 : // 1. Keys must only be added to the Writer in an increasing order.
1901 : // 2. The current w.dataBlockBuf is guaranteed to have the latest key added to the Writer. This
1902 : // must be true, because a w.dataBlockBuf is only switched out when a dataBlock is flushed,
1903 : // however, if a dataBlock is flushed, then we add a key to the new w.dataBlockBuf in the
1904 : // addPoint function after the flush occurs.
1905 2 : if w.dataBlockBuf.dataBlock.nEntries >= 1 {
1906 2 : w.meta.SetLargestPointKey(w.dataBlockBuf.dataBlock.getCurKey().Clone())
1907 2 : }
1908 :
1909 : // Finish the last data block, or force an empty data block if there
1910 : // aren't any data blocks at all.
1911 2 : if w.dataBlockBuf.dataBlock.nEntries > 0 || w.indexBlock.block.nEntries == 0 {
1912 2 : bh, err := w.writeBlock(w.dataBlockBuf.dataBlock.finish(), w.compression, &w.dataBlockBuf.blockBuf)
1913 2 : if err != nil {
1914 0 : return err
1915 0 : }
1916 2 : bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
1917 2 : if err != nil {
1918 0 : return err
1919 0 : }
1920 2 : prevKey := w.dataBlockBuf.dataBlock.getCurKey()
1921 2 : if err := w.addIndexEntrySync(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil {
1922 0 : return err
1923 0 : }
1924 : }
1925 2 : w.props.DataSize = w.meta.Size
1926 2 :
1927 2 : // Write the filter block.
1928 2 : var metaindex rawBlockWriter
1929 2 : metaindex.restartInterval = 1
1930 2 : if w.filter != nil {
1931 2 : b, err := w.filter.finish()
1932 2 : if err != nil {
1933 0 : return err
1934 0 : }
1935 2 : bh, err := w.writeBlock(b, NoCompression, &w.blockBuf)
1936 2 : if err != nil {
1937 0 : return err
1938 0 : }
1939 2 : n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
1940 2 : metaindex.add(InternalKey{UserKey: []byte(w.filter.metaName())}, w.blockBuf.tmp[:n])
1941 2 : w.props.FilterPolicyName = w.filter.policyName()
1942 2 : w.props.FilterSize = bh.Length
1943 : }
1944 :
1945 2 : var indexBH BlockHandle
1946 2 : if w.twoLevelIndex {
1947 2 : w.props.IndexType = twoLevelIndex
1948 2 : // Write the two level index block.
1949 2 : indexBH, err = w.writeTwoLevelIndex()
1950 2 : if err != nil {
1951 0 : return err
1952 0 : }
1953 2 : } else {
1954 2 : w.props.IndexType = binarySearchIndex
1955 2 : // NB: RocksDB includes the block trailer length in the index size
1956 2 : // property, though it doesn't include the trailer in the filter size
1957 2 : // property.
1958 2 : w.props.IndexSize = uint64(w.indexBlock.estimatedSize()) + blockTrailerLen
1959 2 : w.props.NumDataBlocks = uint64(w.indexBlock.block.nEntries)
1960 2 :
1961 2 : // Write the single level index block.
1962 2 : indexBH, err = w.writeBlock(w.indexBlock.finish(), w.compression, &w.blockBuf)
1963 2 : if err != nil {
1964 0 : return err
1965 0 : }
1966 : }
1967 :
1968 : // Write the range-del block. The block handle must added to the meta index block
1969 : // after the properties block has been written. This is because the entries in the
1970 : // metaindex block must be sorted by key.
1971 2 : var rangeDelBH BlockHandle
1972 2 : if w.props.NumRangeDeletions > 0 {
1973 2 : if !w.rangeDelV1Format {
1974 2 : // Because the range tombstones are fragmented in the v2 format, the end
1975 2 : // key of the last added range tombstone will be the largest range
1976 2 : // tombstone key. Note that we need to make this into a range deletion
1977 2 : // sentinel because sstable boundaries are inclusive while the end key of
1978 2 : // a range deletion tombstone is exclusive. A Clone() is necessary as
1979 2 : // rangeDelBlock.curValue is the same slice that will get passed
1980 2 : // into w.writer, and some implementations of vfs.File mutate the
1981 2 : // slice passed into Write(). Also, w.meta will often outlive the
1982 2 : // blockWriter, and so cloning curValue allows the rangeDelBlock's
1983 2 : // internal buffer to get gc'd.
1984 2 : k := base.MakeRangeDeleteSentinelKey(w.rangeDelBlock.curValue).Clone()
1985 2 : w.meta.SetLargestRangeDelKey(k)
1986 2 : }
1987 2 : rangeDelBH, err = w.writeBlock(w.rangeDelBlock.finish(), NoCompression, &w.blockBuf)
1988 2 : if err != nil {
1989 0 : return err
1990 0 : }
1991 : }
1992 :
1993 : // Write the range-key block, flushing any remaining spans from the
1994 : // fragmenter first.
1995 2 : w.fragmenter.Finish()
1996 2 :
1997 2 : var rangeKeyBH BlockHandle
1998 2 : if w.props.NumRangeKeys() > 0 {
1999 2 : key := w.rangeKeyBlock.getCurKey()
2000 2 : kind := key.Kind()
2001 2 : endKey, _, ok := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.curValue)
2002 2 : if !ok {
2003 0 : return errors.Newf("invalid end key: %s", w.rangeKeyBlock.curValue)
2004 0 : }
2005 2 : k := base.MakeExclusiveSentinelKey(kind, endKey).Clone()
2006 2 : w.meta.SetLargestRangeKey(k)
2007 2 : // TODO(travers): The lack of compression on the range key block matches the
2008 2 : // lack of compression on the range-del block. Revisit whether we want to
2009 2 : // enable compression on this block.
2010 2 : rangeKeyBH, err = w.writeBlock(w.rangeKeyBlock.finish(), NoCompression, &w.blockBuf)
2011 2 : if err != nil {
2012 0 : return err
2013 0 : }
2014 : }
2015 :
2016 2 : if w.valueBlockWriter != nil {
2017 2 : vbiHandle, vbStats, err := w.valueBlockWriter.finish(w, w.meta.Size)
2018 2 : if err != nil {
2019 0 : return err
2020 0 : }
2021 2 : w.props.NumValueBlocks = vbStats.numValueBlocks
2022 2 : w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
2023 2 : w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
2024 2 : if vbStats.numValueBlocks > 0 {
2025 2 : n := encodeValueBlocksIndexHandle(w.blockBuf.tmp[:], vbiHandle)
2026 2 : metaindex.add(InternalKey{UserKey: []byte(metaValueIndexName)}, w.blockBuf.tmp[:n])
2027 2 : }
2028 : }
2029 :
2030 : // Add the range key block handle to the metaindex block. Note that we add the
2031 : // block handle to the metaindex block before the other meta blocks as the
2032 : // metaindex block entries must be sorted, and the range key block name sorts
2033 : // before the other block names.
2034 2 : if w.props.NumRangeKeys() > 0 {
2035 2 : n := encodeBlockHandle(w.blockBuf.tmp[:], rangeKeyBH)
2036 2 : metaindex.add(InternalKey{UserKey: []byte(metaRangeKeyName)}, w.blockBuf.tmp[:n])
2037 2 : }
2038 :
2039 2 : {
2040 2 : userProps := make(map[string]string)
2041 2 : for i := range w.propCollectors {
2042 1 : if err := w.propCollectors[i].Finish(userProps); err != nil {
2043 1 : return err
2044 1 : }
2045 : }
2046 2 : for i := range w.blockPropCollectors {
2047 2 : scratch := w.blockPropsEncoder.getScratchForProp()
2048 2 : // Place the shortID in the first byte.
2049 2 : scratch = append(scratch, byte(i))
2050 2 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
2051 2 : if err != nil {
2052 1 : return err
2053 1 : }
2054 2 : var prop string
2055 2 : if len(buf) > 0 {
2056 2 : prop = string(buf)
2057 2 : }
2058 : // NB: The property is populated in the map even if it is the
2059 : // empty string, since the presence in the map is what indicates
2060 : // that the block property collector was used when writing.
2061 2 : userProps[w.blockPropCollectors[i].Name()] = prop
2062 : }
2063 2 : if len(userProps) > 0 {
2064 2 : w.props.UserProperties = userProps
2065 2 : }
2066 :
2067 : // Write the properties block.
2068 2 : var raw rawBlockWriter
2069 2 : // The restart interval is set to infinity because the properties block
2070 2 : // is always read sequentially and cached in a heap located object. This
2071 2 : // reduces table size without a significant impact on performance.
2072 2 : raw.restartInterval = propertiesBlockRestartInterval
2073 2 : w.props.CompressionOptions = rocksDBCompressionOptions
2074 2 : w.props.save(w.tableFormat, &raw)
2075 2 : bh, err := w.writeBlock(raw.finish(), NoCompression, &w.blockBuf)
2076 2 : if err != nil {
2077 0 : return err
2078 0 : }
2079 2 : n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
2080 2 : metaindex.add(InternalKey{UserKey: []byte(metaPropertiesName)}, w.blockBuf.tmp[:n])
2081 : }
2082 :
2083 : // Add the range deletion block handle to the metaindex block.
2084 2 : if w.props.NumRangeDeletions > 0 {
2085 2 : n := encodeBlockHandle(w.blockBuf.tmp[:], rangeDelBH)
2086 2 : // The v2 range-del block encoding is backwards compatible with the v1
2087 2 : // encoding. We add meta-index entries for both the old name and the new
2088 2 : // name so that old code can continue to find the range-del block and new
2089 2 : // code knows that the range tombstones in the block are fragmented and
2090 2 : // sorted.
2091 2 : metaindex.add(InternalKey{UserKey: []byte(metaRangeDelName)}, w.blockBuf.tmp[:n])
2092 2 : if !w.rangeDelV1Format {
2093 2 : metaindex.add(InternalKey{UserKey: []byte(metaRangeDelV2Name)}, w.blockBuf.tmp[:n])
2094 2 : }
2095 : }
2096 :
2097 : // Write the metaindex block. It might be an empty block, if the filter
2098 : // policy is nil. NoCompression is specified because a) RocksDB never
2099 : // compresses the meta-index block and b) RocksDB has some code paths which
2100 : // expect the meta-index block to not be compressed.
2101 2 : metaindexBH, err := w.writeBlock(metaindex.blockWriter.finish(), NoCompression, &w.blockBuf)
2102 2 : if err != nil {
2103 0 : return err
2104 0 : }
2105 :
2106 : // Write the table footer.
2107 2 : footer := footer{
2108 2 : format: w.tableFormat,
2109 2 : checksum: w.blockBuf.checksummer.checksumType,
2110 2 : metaindexBH: metaindexBH,
2111 2 : indexBH: indexBH,
2112 2 : }
2113 2 : encoded := footer.encode(w.blockBuf.tmp[:])
2114 2 : if err := w.writable.Write(footer.encode(w.blockBuf.tmp[:])); err != nil {
2115 0 : return err
2116 0 : }
2117 2 : w.meta.Size += uint64(len(encoded))
2118 2 : w.meta.Properties = w.props
2119 2 :
2120 2 : // Check that the features present in the table are compatible with the format
2121 2 : // configured for the table.
2122 2 : if err = w.assertFormatCompatibility(); err != nil {
2123 1 : return err
2124 1 : }
2125 :
2126 2 : if err := w.writable.Finish(); err != nil {
2127 1 : w.writable = nil
2128 1 : return err
2129 1 : }
2130 2 : w.writable = nil
2131 2 :
2132 2 : w.dataBlockBuf.clear()
2133 2 : dataBlockBufPool.Put(w.dataBlockBuf)
2134 2 : w.dataBlockBuf = nil
2135 2 : w.indexBlock.clear()
2136 2 : indexBlockBufPool.Put(w.indexBlock)
2137 2 : w.indexBlock = nil
2138 2 :
2139 2 : // Make any future calls to Set or Close return an error.
2140 2 : w.err = errWriterClosed
2141 2 : return nil
2142 : }
2143 :
2144 : // EstimatedSize returns the estimated size of the sstable being written if a
2145 : // call to Finish() was made without adding additional keys.
2146 2 : func (w *Writer) EstimatedSize() uint64 {
2147 2 : return w.coordination.sizeEstimate.size() +
2148 2 : uint64(w.dataBlockBuf.dataBlock.estimatedSize()) +
2149 2 : w.indexBlock.estimatedSize()
2150 2 : }
2151 :
2152 : // Metadata returns the metadata for the finished sstable. Only valid to call
2153 : // after the sstable has been finished.
2154 2 : func (w *Writer) Metadata() (*WriterMetadata, error) {
2155 2 : if w.writable != nil {
2156 0 : return nil, errors.New("pebble: writer is not closed")
2157 0 : }
2158 2 : return &w.meta, nil
2159 : }
2160 :
2161 : // WriterOption provide an interface to do work on Writer while it is being
2162 : // opened.
2163 : type WriterOption interface {
2164 : // writerApply is called on the writer during opening in order to set
2165 : // internal parameters.
2166 : writerApply(*Writer)
2167 : }
2168 :
2169 : // PreviousPointKeyOpt is a WriterOption that provides access to the last
2170 : // point key written to the writer while building a sstable.
2171 : type PreviousPointKeyOpt struct {
2172 : w *Writer
2173 : }
2174 :
2175 : // UnsafeKey returns the last point key written to the writer to which this
2176 : // option was passed during creation. The returned key points directly into
2177 : // a buffer belonging to the Writer. The value's lifetime ends the next time a
2178 : // point key is added to the Writer.
2179 : // Invariant: UnsafeKey isn't and shouldn't be called after the Writer is closed.
2180 2 : func (o PreviousPointKeyOpt) UnsafeKey() base.InternalKey {
2181 2 : if o.w == nil {
2182 0 : return base.InvalidInternalKey
2183 0 : }
2184 :
2185 2 : if o.w.dataBlockBuf.dataBlock.nEntries >= 1 {
2186 2 : // o.w.dataBlockBuf.dataBlock.curKey is guaranteed to point to the last point key
2187 2 : // which was added to the Writer.
2188 2 : return o.w.dataBlockBuf.dataBlock.getCurKey()
2189 2 : }
2190 0 : return base.InternalKey{}
2191 : }
2192 :
2193 2 : func (o *PreviousPointKeyOpt) writerApply(w *Writer) {
2194 2 : o.w = w
2195 2 : }
2196 :
2197 : // NewWriter returns a new table writer for the file. Closing the writer will
2198 : // close the file.
2199 2 : func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...WriterOption) *Writer {
2200 2 : o = o.ensureDefaults()
2201 2 : w := &Writer{
2202 2 : writable: writable,
2203 2 : meta: WriterMetadata{
2204 2 : SmallestSeqNum: math.MaxUint64,
2205 2 : },
2206 2 : blockSize: o.BlockSize,
2207 2 : blockSizeThreshold: (o.BlockSize*o.BlockSizeThreshold + 99) / 100,
2208 2 : indexBlockSize: o.IndexBlockSize,
2209 2 : indexBlockSizeThreshold: (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100,
2210 2 : compare: o.Comparer.Compare,
2211 2 : split: o.Comparer.Split,
2212 2 : formatKey: o.Comparer.FormatKey,
2213 2 : compression: o.Compression,
2214 2 : separator: o.Comparer.Separator,
2215 2 : successor: o.Comparer.Successor,
2216 2 : tableFormat: o.TableFormat,
2217 2 : isStrictObsolete: o.IsStrictObsolete,
2218 2 : writingToLowestLevel: o.WritingToLowestLevel,
2219 2 : cache: o.Cache,
2220 2 : restartInterval: o.BlockRestartInterval,
2221 2 : checksumType: o.Checksum,
2222 2 : indexBlock: newIndexBlockBuf(o.Parallelism),
2223 2 : rangeDelBlock: blockWriter{
2224 2 : restartInterval: 1,
2225 2 : },
2226 2 : rangeKeyBlock: blockWriter{
2227 2 : restartInterval: 1,
2228 2 : },
2229 2 : topLevelIndexBlock: blockWriter{
2230 2 : restartInterval: 1,
2231 2 : },
2232 2 : fragmenter: keyspan.Fragmenter{
2233 2 : Cmp: o.Comparer.Compare,
2234 2 : Format: o.Comparer.FormatKey,
2235 2 : },
2236 2 : }
2237 2 : if w.tableFormat >= TableFormatPebblev3 {
2238 2 : w.shortAttributeExtractor = o.ShortAttributeExtractor
2239 2 : w.requiredInPlaceValueBound = o.RequiredInPlaceValueBound
2240 2 : w.valueBlockWriter = newValueBlockWriter(
2241 2 : w.blockSize, w.blockSizeThreshold, w.compression, w.checksumType, func(compressedSize int) {
2242 2 : w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0)
2243 2 : })
2244 : }
2245 :
2246 2 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
2247 2 :
2248 2 : w.blockBuf = blockBuf{
2249 2 : checksummer: checksummer{checksumType: o.Checksum},
2250 2 : }
2251 2 :
2252 2 : w.coordination.init(o.Parallelism, w)
2253 2 :
2254 2 : if writable == nil {
2255 0 : w.err = errors.New("pebble: nil writable")
2256 0 : return w
2257 0 : }
2258 :
2259 : // Note that WriterOptions are applied in two places; the ones with a
2260 : // preApply() method are applied here. The rest are applied down below after
2261 : // default properties are set.
2262 2 : type preApply interface{ preApply() }
2263 2 : for _, opt := range extraOpts {
2264 2 : if _, ok := opt.(preApply); ok {
2265 2 : opt.writerApply(w)
2266 2 : }
2267 : }
2268 :
2269 2 : w.props.PrefixExtractorName = "nullptr"
2270 2 : if o.FilterPolicy != nil {
2271 2 : switch o.FilterType {
2272 2 : case TableFilter:
2273 2 : w.filter = newTableFilterWriter(o.FilterPolicy)
2274 2 : if w.split != nil {
2275 2 : w.props.PrefixExtractorName = o.Comparer.Name
2276 2 : w.props.PrefixFiltering = true
2277 2 : } else {
2278 1 : w.props.WholeKeyFiltering = true
2279 1 : }
2280 0 : default:
2281 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
2282 : }
2283 : }
2284 :
2285 2 : w.props.ComparerName = o.Comparer.Name
2286 2 : w.props.CompressionName = o.Compression.String()
2287 2 : w.props.MergerName = o.MergerName
2288 2 : w.props.PropertyCollectorNames = "[]"
2289 2 : w.props.ExternalFormatVersion = rocksDBExternalFormatVersion
2290 2 :
2291 2 : if len(o.TablePropertyCollectors) > 0 || len(o.BlockPropertyCollectors) > 0 ||
2292 2 : w.tableFormat >= TableFormatPebblev4 {
2293 2 : var buf bytes.Buffer
2294 2 : buf.WriteString("[")
2295 2 : if len(o.TablePropertyCollectors) > 0 {
2296 1 : w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors))
2297 1 : for i := range o.TablePropertyCollectors {
2298 1 : w.propCollectors[i] = o.TablePropertyCollectors[i]()
2299 1 : if i > 0 {
2300 1 : buf.WriteString(",")
2301 1 : }
2302 1 : buf.WriteString(w.propCollectors[i].Name())
2303 : }
2304 : }
2305 2 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
2306 2 : if w.tableFormat >= TableFormatPebblev4 {
2307 2 : numBlockPropertyCollectors++
2308 2 : }
2309 : // shortID is a uint8, so we cannot exceed that number of block
2310 : // property collectors.
2311 2 : if numBlockPropertyCollectors > math.MaxUint8 {
2312 0 : w.err = errors.New("pebble: too many block property collectors")
2313 0 : return w
2314 0 : }
2315 2 : if numBlockPropertyCollectors > 0 {
2316 2 : w.blockPropCollectors = make([]BlockPropertyCollector, numBlockPropertyCollectors)
2317 2 : }
2318 2 : if len(o.BlockPropertyCollectors) > 0 {
2319 2 : // The shortID assigned to a collector is the same as its index in
2320 2 : // this slice.
2321 2 : for i := range o.BlockPropertyCollectors {
2322 2 : w.blockPropCollectors[i] = o.BlockPropertyCollectors[i]()
2323 2 : if i > 0 || len(o.TablePropertyCollectors) > 0 {
2324 1 : buf.WriteString(",")
2325 1 : }
2326 2 : buf.WriteString(w.blockPropCollectors[i].Name())
2327 : }
2328 : }
2329 2 : if w.tableFormat >= TableFormatPebblev4 {
2330 2 : if numBlockPropertyCollectors > 1 || len(o.TablePropertyCollectors) > 0 {
2331 2 : buf.WriteString(",")
2332 2 : }
2333 2 : w.blockPropCollectors[numBlockPropertyCollectors-1] = &w.obsoleteCollector
2334 2 : buf.WriteString(w.obsoleteCollector.Name())
2335 : }
2336 2 : buf.WriteString("]")
2337 2 : w.props.PropertyCollectorNames = buf.String()
2338 : }
2339 :
2340 : // Apply the remaining WriterOptions that do not have a preApply() method.
2341 2 : for _, opt := range extraOpts {
2342 2 : if _, ok := opt.(preApply); ok {
2343 2 : continue
2344 : }
2345 2 : opt.writerApply(w)
2346 : }
2347 :
2348 : // Initialize the range key fragmenter and encoder.
2349 2 : w.fragmenter.Emit = w.encodeRangeKeySpan
2350 2 : w.rangeKeyEncoder.Emit = w.addRangeKey
2351 2 : return w
2352 : }
2353 :
2354 : // internalGetProperties is a private, internal-use-only function that takes a
2355 : // Writer and returns a pointer to its Properties, allowing direct mutation.
2356 : // It's used by internal Pebble flushes and compactions to set internal
2357 : // properties. It gets installed in private.
2358 2 : func internalGetProperties(w *Writer) *Properties {
2359 2 : return &w.props
2360 2 : }
2361 :
2362 2 : func init() {
2363 2 : private.SSTableWriterDisableKeyOrderChecks = func(i interface{}) {
2364 1 : w := i.(*Writer)
2365 1 : w.disableKeyOrderChecks = true
2366 1 : }
2367 2 : private.SSTableInternalProperties = internalGetProperties
2368 : }
2369 :
2370 : type obsoleteKeyBlockPropertyCollector struct {
2371 : blockIsNonObsolete bool
2372 : indexIsNonObsolete bool
2373 : tableIsNonObsolete bool
2374 : }
2375 :
2376 2 : func encodeNonObsolete(isNonObsolete bool, buf []byte) []byte {
2377 2 : if isNonObsolete {
2378 2 : return buf
2379 2 : }
2380 2 : return append(buf, 't')
2381 : }
2382 :
2383 2 : func (o *obsoleteKeyBlockPropertyCollector) Name() string {
2384 2 : return "obsolete-key"
2385 2 : }
2386 :
2387 2 : func (o *obsoleteKeyBlockPropertyCollector) Add(key InternalKey, value []byte) error {
2388 2 : // Ignore.
2389 2 : return nil
2390 2 : }
2391 :
2392 2 : func (o *obsoleteKeyBlockPropertyCollector) AddPoint(isObsolete bool) {
2393 2 : o.blockIsNonObsolete = o.blockIsNonObsolete || !isObsolete
2394 2 : }
2395 :
2396 2 : func (o *obsoleteKeyBlockPropertyCollector) FinishDataBlock(buf []byte) ([]byte, error) {
2397 2 : o.tableIsNonObsolete = o.tableIsNonObsolete || o.blockIsNonObsolete
2398 2 : return encodeNonObsolete(o.blockIsNonObsolete, buf), nil
2399 2 : }
2400 :
2401 2 : func (o *obsoleteKeyBlockPropertyCollector) AddPrevDataBlockToIndexBlock() {
2402 2 : o.indexIsNonObsolete = o.indexIsNonObsolete || o.blockIsNonObsolete
2403 2 : o.blockIsNonObsolete = false
2404 2 : }
2405 :
2406 2 : func (o *obsoleteKeyBlockPropertyCollector) FinishIndexBlock(buf []byte) ([]byte, error) {
2407 2 : indexIsNonObsolete := o.indexIsNonObsolete
2408 2 : o.indexIsNonObsolete = false
2409 2 : return encodeNonObsolete(indexIsNonObsolete, buf), nil
2410 2 : }
2411 :
2412 2 : func (o *obsoleteKeyBlockPropertyCollector) FinishTable(buf []byte) ([]byte, error) {
2413 2 : return encodeNonObsolete(o.tableIsNonObsolete, buf), nil
2414 2 : }
2415 :
2416 : func (o *obsoleteKeyBlockPropertyCollector) UpdateKeySuffixes(
2417 : oldProp []byte, oldSuffix, newSuffix []byte,
2418 1 : ) error {
2419 1 : _, err := propToIsObsolete(oldProp)
2420 1 : if err != nil {
2421 0 : return err
2422 0 : }
2423 : // Suffix rewriting currently loses the obsolete bit.
2424 1 : o.blockIsNonObsolete = true
2425 1 : return nil
2426 : }
2427 :
2428 : // NB: obsoleteKeyBlockPropertyFilter is stateless. This aspect of the filter
2429 : // is used in table_cache.go for in-place modification of a filters slice.
2430 : type obsoleteKeyBlockPropertyFilter struct{}
2431 :
2432 2 : func (o obsoleteKeyBlockPropertyFilter) Name() string {
2433 2 : return "obsolete-key"
2434 2 : }
2435 :
2436 : // Intersects returns true if the set represented by prop intersects with
2437 : // the set in the filter.
2438 2 : func (o obsoleteKeyBlockPropertyFilter) Intersects(prop []byte) (bool, error) {
2439 2 : return propToIsObsolete(prop)
2440 2 : }
2441 :
2442 2 : func propToIsObsolete(prop []byte) (bool, error) {
2443 2 : if len(prop) == 0 {
2444 2 : return true, nil
2445 2 : }
2446 2 : if len(prop) > 1 || prop[0] != 't' {
2447 0 : return false, errors.Errorf("unexpected property %x", prop)
2448 0 : }
2449 2 : return false, nil
2450 : }
|