Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "math"
12 : "runtime"
13 : "sort"
14 : "sync"
15 :
16 : "github.com/cespare/xxhash/v2"
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/bytealloc"
20 : "github.com/cockroachdb/pebble/internal/cache"
21 : "github.com/cockroachdb/pebble/internal/crc"
22 : "github.com/cockroachdb/pebble/internal/invariants"
23 : "github.com/cockroachdb/pebble/internal/keyspan"
24 : "github.com/cockroachdb/pebble/internal/private"
25 : "github.com/cockroachdb/pebble/internal/rangekey"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : )
28 :
29 : // encodedBHPEstimatedSize estimates the size of the encoded BlockHandleWithProperties.
30 : // It would also be nice to account for the length of the data block properties here,
31 : // but isn't necessary since this is an estimate.
32 : const encodedBHPEstimatedSize = binary.MaxVarintLen64 * 2
33 :
34 : var errWriterClosed = errors.New("pebble: writer is closed")
35 :
36 : // WriterMetadata holds info about a finished sstable.
37 : type WriterMetadata struct {
38 : Size uint64
39 : SmallestPoint InternalKey
40 : // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
41 : // before Writer.Close is called, because they may only be set on
42 : // Writer.Close.
43 : LargestPoint InternalKey
44 : SmallestRangeDel InternalKey
45 : LargestRangeDel InternalKey
46 : SmallestRangeKey InternalKey
47 : LargestRangeKey InternalKey
48 : HasPointKeys bool
49 : HasRangeDelKeys bool
50 : HasRangeKeys bool
51 : SmallestSeqNum uint64
52 : LargestSeqNum uint64
53 : Properties Properties
54 : }
55 :
56 : // SetSmallestPointKey sets the smallest point key to the given key.
57 : // NB: this method set the "absolute" smallest point key. Any existing key is
58 : // overridden.
59 1 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
60 1 : m.SmallestPoint = k
61 1 : m.HasPointKeys = true
62 1 : }
63 :
64 : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
65 : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
66 : // overridden.
67 1 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
68 1 : m.SmallestRangeDel = k
69 1 : m.HasRangeDelKeys = true
70 1 : }
71 :
72 : // SetSmallestRangeKey sets the smallest range key to the given key.
73 : // NB: this method set the "absolute" smallest range key. Any existing key is
74 : // overridden.
75 1 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
76 1 : m.SmallestRangeKey = k
77 1 : m.HasRangeKeys = true
78 1 : }
79 :
80 : // SetLargestPointKey sets the largest point key to the given key.
81 : // NB: this method set the "absolute" largest point key. Any existing key is
82 : // overridden.
83 1 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
84 1 : m.LargestPoint = k
85 1 : m.HasPointKeys = true
86 1 : }
87 :
88 : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
89 : // NB: this method set the "absolute" largest rangedel key. Any existing key is
90 : // overridden.
91 1 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
92 1 : m.LargestRangeDel = k
93 1 : m.HasRangeDelKeys = true
94 1 : }
95 :
96 : // SetLargestRangeKey sets the largest range key to the given key.
97 : // NB: this method set the "absolute" largest range key. Any existing key is
98 : // overridden.
99 1 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
100 1 : m.LargestRangeKey = k
101 1 : m.HasRangeKeys = true
102 1 : }
103 :
104 1 : func (m *WriterMetadata) updateSeqNum(seqNum uint64) {
105 1 : if m.SmallestSeqNum > seqNum {
106 1 : m.SmallestSeqNum = seqNum
107 1 : }
108 1 : if m.LargestSeqNum < seqNum {
109 1 : m.LargestSeqNum = seqNum
110 1 : }
111 : }
112 :
113 : // Writer is a table writer.
114 : type Writer struct {
115 : writable objstorage.Writable
116 : meta WriterMetadata
117 : err error
118 : // cacheID and fileNum are used to remove blocks written to the sstable from
119 : // the cache, providing a defense in depth against bugs which cause cache
120 : // collisions.
121 : cacheID uint64
122 : fileNum base.DiskFileNum
123 : // The following fields are copied from Options.
124 : blockSize int
125 : blockSizeThreshold int
126 : indexBlockSize int
127 : indexBlockSizeThreshold int
128 : compare Compare
129 : split Split
130 : formatKey base.FormatKey
131 : compression Compression
132 : separator Separator
133 : successor Successor
134 : tableFormat TableFormat
135 : isStrictObsolete bool
136 : writingToLowestLevel bool
137 : cache *cache.Cache
138 : restartInterval int
139 : checksumType ChecksumType
140 : // disableKeyOrderChecks disables the checks that keys are added to an
141 : // sstable in order. It is intended for internal use only in the construction
142 : // of invalid sstables for testing. See tool/make_test_sstables.go.
143 : disableKeyOrderChecks bool
144 : // With two level indexes, the index/filter of a SST file is partitioned into
145 : // smaller blocks with an additional top-level index on them. When reading an
146 : // index/filter, only the top-level index is loaded into memory. The two level
147 : // index/filter then uses the top-level index to load on demand into the block
148 : // cache the partitions that are required to perform the index/filter query.
149 : //
150 : // Two level indexes are enabled automatically when there is more than one
151 : // index block.
152 : //
153 : // This is useful when there are very large index blocks, which generally occurs
154 : // with the usage of large keys. With large index blocks, the index blocks fight
155 : // the data blocks for block cache space and the index blocks are likely to be
156 : // re-read many times from the disk. The top level index, which has a much
157 : // smaller memory footprint, can be used to prevent the entire index block from
158 : // being loaded into the block cache.
159 : twoLevelIndex bool
160 : // Internal flag to allow creation of range-del-v1 format blocks. Only used
161 : // for testing. Note that v2 format blocks are backwards compatible with v1
162 : // format blocks.
163 : rangeDelV1Format bool
164 : indexBlock *indexBlockBuf
165 : rangeDelBlock blockWriter
166 : rangeKeyBlock blockWriter
167 : topLevelIndexBlock blockWriter
168 : props Properties
169 : blockPropCollectors []BlockPropertyCollector
170 : obsoleteCollector obsoleteKeyBlockPropertyCollector
171 : blockPropsEncoder blockPropertiesEncoder
172 : // filter accumulates the filter block. If populated, the filter ingests
173 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
174 : // nil, or the full keys otherwise.
175 : filter filterWriter
176 : indexPartitions []indexBlockAndBlockProperties
177 :
178 : // indexBlockAlloc is used to bulk-allocate byte slices used to store index
179 : // blocks in indexPartitions. These live until the index finishes.
180 : indexBlockAlloc []byte
181 : // indexSepAlloc is used to bulk-allocate index block separator slices stored
182 : // in indexPartitions. These live until the index finishes.
183 : indexSepAlloc bytealloc.A
184 :
185 : // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
186 : // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
187 : // and values, emitting fragmented, coalesced spans as appropriate. Range
188 : // keys must be added in order of their start user-key.
189 : fragmenter keyspan.Fragmenter
190 : rangeKeyEncoder rangekey.Encoder
191 : rangeKeysBySuffix keyspan.KeysBySuffix
192 : rangeKeySpan keyspan.Span
193 : rkBuf []byte
194 : // dataBlockBuf consists of the state which is currently owned by and used by
195 : // the Writer client goroutine. This state can be handed off to other goroutines.
196 : dataBlockBuf *dataBlockBuf
197 : // blockBuf consists of the state which is owned by and used by the Writer client
198 : // goroutine.
199 : blockBuf blockBuf
200 :
201 : coordination coordinationState
202 :
203 : // Information (other than the byte slice) about the last point key, to
204 : // avoid extracting it again.
205 : lastPointKeyInfo pointKeyInfo
206 :
207 : // For value blocks.
208 : shortAttributeExtractor base.ShortAttributeExtractor
209 : requiredInPlaceValueBound UserKeyPrefixBound
210 : // When w.tableFormat >= TableFormatPebblev3, valueBlockWriter is nil iff
211 : // WriterOptions.DisableValueBlocks was true.
212 : valueBlockWriter *valueBlockWriter
213 : }
214 :
215 : type pointKeyInfo struct {
216 : trailer uint64
217 : // Only computed when w.valueBlockWriter is not nil.
218 : userKeyLen int
219 : // prefixLen uses w.split, if not nil. Only computed when w.valueBlockWriter
220 : // is not nil.
221 : prefixLen int
222 : // True iff the point was marked obsolete.
223 : isObsolete bool
224 : }
225 :
226 : type coordinationState struct {
227 : parallelismEnabled bool
228 :
229 : // writeQueue is used to write data blocks to disk. The writeQueue is primarily
230 : // used to maintain the order in which data blocks must be written to disk. For
231 : // this reason, every single data block write must be done through the writeQueue.
232 : writeQueue *writeQueue
233 :
234 : sizeEstimate dataBlockEstimates
235 : }
236 :
237 1 : func (c *coordinationState) init(parallelismEnabled bool, writer *Writer) {
238 1 : c.parallelismEnabled = parallelismEnabled
239 1 : // useMutex is false regardless of parallelismEnabled, because we do not do
240 1 : // parallel compression yet.
241 1 : c.sizeEstimate.useMutex = false
242 1 :
243 1 : // writeQueueSize determines the size of the write queue, or the number
244 1 : // of items which can be added to the queue without blocking. By default, we
245 1 : // use a writeQueue size of 0, since we won't be doing any block writes in
246 1 : // parallel.
247 1 : writeQueueSize := 0
248 1 : if parallelismEnabled {
249 1 : writeQueueSize = runtime.GOMAXPROCS(0)
250 1 : }
251 1 : c.writeQueue = newWriteQueue(writeQueueSize, writer)
252 : }
253 :
254 : // sizeEstimate is a general purpose helper for estimating two kinds of sizes:
255 : // A. The compressed sstable size, which is useful for deciding when to start
256 : //
257 : // a new sstable during flushes or compactions. In practice, we use this in
258 : // estimating the data size (excluding the index).
259 : //
260 : // B. The size of index blocks to decide when to start a new index block.
261 : //
262 : // There are some terminology peculiarities which are due to the origin of
263 : // sizeEstimate for use case A with parallel compression enabled (for which
264 : // the code has not been merged). Specifically this relates to the terms
265 : // "written" and "compressed".
266 : // - The notion of "written" for case A is sufficiently defined by saying that
267 : // the data block is compressed. Waiting for the actual data block write to
268 : // happen can result in unnecessary estimation, when we already know how big
269 : // it will be in compressed form. Additionally, with the forthcoming value
270 : // blocks containing older MVCC values, these compressed block will be held
271 : // in-memory until late in the sstable writing, and we do want to accurately
272 : // account for them without waiting for the actual write.
273 : // For case B, "written" means that the index entry has been fully
274 : // generated, and has been added to the uncompressed block buffer for that
275 : // index block. It does not include actually writing a potentially
276 : // compressed index block.
277 : // - The notion of "compressed" is to differentiate between a "inflight" size
278 : // and the actual size, and is handled via computing a compression ratio
279 : // observed so far (defaults to 1).
280 : // For case A, this is actual data block compression, so the "inflight" size
281 : // is uncompressed blocks (that are no longer being written to) and the
282 : // "compressed" size is after they have been compressed.
283 : // For case B the inflight size is for a key-value pair in the index for
284 : // which the value size (the encoded size of the BlockHandleWithProperties)
285 : // is not accurately known, while the compressed size is the size of that
286 : // entry when it has been added to the (in-progress) index ssblock.
287 : //
288 : // Usage: To update state, one can optionally provide an inflight write value
289 : // using addInflight (used for case B). When something is "written" the state
290 : // can be updated using either writtenWithDelta or writtenWithTotal, which
291 : // provide the actual delta size or the total size (latter must be
292 : // monotonically non-decreasing). If there were no calls to addInflight, there
293 : // isn't any real estimation happening here. So case A does not do any real
294 : // estimation. However, when we introduce parallel compression, there will be
295 : // estimation in that the client goroutine will call addInFlight and the
296 : // compression goroutines will call writtenWithDelta.
297 : type sizeEstimate struct {
298 : // emptySize is the size when there is no inflight data, and numEntries is 0.
299 : // emptySize is constant once set.
300 : emptySize uint64
301 :
302 : // inflightSize is the estimated size of some inflight data which hasn't
303 : // been written yet.
304 : inflightSize uint64
305 :
306 : // totalSize is the total size of the data which has already been written.
307 : totalSize uint64
308 :
309 : // numWrittenEntries is the total number of entries which have already been
310 : // written.
311 : numWrittenEntries uint64
312 : // numInflightEntries is the total number of entries which are inflight, and
313 : // haven't been written.
314 : numInflightEntries uint64
315 :
316 : // maxEstimatedSize stores the maximum result returned from sizeEstimate.size.
317 : // It ensures that values returned from subsequent calls to Writer.EstimatedSize
318 : // never decrease.
319 : maxEstimatedSize uint64
320 :
321 : // We assume that the entries added to the sizeEstimate can be compressed.
322 : // For this reason, we keep track of a compressedSize and an uncompressedSize
323 : // to compute a compression ratio for the inflight entries. If the entries
324 : // aren't being compressed, then compressedSize and uncompressedSize must be
325 : // equal.
326 : compressedSize uint64
327 : uncompressedSize uint64
328 : }
329 :
330 1 : func (s *sizeEstimate) init(emptySize uint64) {
331 1 : s.emptySize = emptySize
332 1 : }
333 :
334 1 : func (s *sizeEstimate) size() uint64 {
335 1 : ratio := float64(1)
336 1 : if s.uncompressedSize > 0 {
337 1 : ratio = float64(s.compressedSize) / float64(s.uncompressedSize)
338 1 : }
339 1 : estimatedInflightSize := uint64(float64(s.inflightSize) * ratio)
340 1 : total := s.totalSize + estimatedInflightSize
341 1 : if total > s.maxEstimatedSize {
342 1 : s.maxEstimatedSize = total
343 1 : } else {
344 1 : total = s.maxEstimatedSize
345 1 : }
346 :
347 1 : if total == 0 {
348 1 : return s.emptySize
349 1 : }
350 :
351 1 : return total
352 : }
353 :
354 1 : func (s *sizeEstimate) numTotalEntries() uint64 {
355 1 : return s.numWrittenEntries + s.numInflightEntries
356 1 : }
357 :
358 1 : func (s *sizeEstimate) addInflight(size int) {
359 1 : s.numInflightEntries++
360 1 : s.inflightSize += uint64(size)
361 1 : }
362 :
363 1 : func (s *sizeEstimate) writtenWithTotal(newTotalSize uint64, inflightSize int) {
364 1 : finalEntrySize := int(newTotalSize - s.totalSize)
365 1 : s.writtenWithDelta(finalEntrySize, inflightSize)
366 1 : }
367 :
368 1 : func (s *sizeEstimate) writtenWithDelta(finalEntrySize int, inflightSize int) {
369 1 : if inflightSize > 0 {
370 1 : // This entry was previously inflight, so we should decrement inflight
371 1 : // entries and update the "compression" stats for future estimation.
372 1 : s.numInflightEntries--
373 1 : s.inflightSize -= uint64(inflightSize)
374 1 : s.uncompressedSize += uint64(inflightSize)
375 1 : s.compressedSize += uint64(finalEntrySize)
376 1 : }
377 1 : s.numWrittenEntries++
378 1 : s.totalSize += uint64(finalEntrySize)
379 : }
380 :
381 1 : func (s *sizeEstimate) clear() {
382 1 : *s = sizeEstimate{emptySize: s.emptySize}
383 1 : }
384 :
385 : type indexBlockBuf struct {
386 : // block will only be accessed from the writeQueue.
387 : block blockWriter
388 :
389 : size struct {
390 : useMutex bool
391 : mu sync.Mutex
392 : estimate sizeEstimate
393 : }
394 :
395 : // restartInterval matches indexBlockBuf.block.restartInterval. We store it twice, because the `block`
396 : // must only be accessed from the writeQueue goroutine.
397 : restartInterval int
398 : }
399 :
400 1 : func (i *indexBlockBuf) clear() {
401 1 : i.block.clear()
402 1 : if i.size.useMutex {
403 1 : i.size.mu.Lock()
404 1 : defer i.size.mu.Unlock()
405 1 : }
406 1 : i.size.estimate.clear()
407 1 : i.restartInterval = 0
408 : }
409 :
410 : var indexBlockBufPool = sync.Pool{
411 1 : New: func() interface{} {
412 1 : return &indexBlockBuf{}
413 1 : },
414 : }
415 :
416 : const indexBlockRestartInterval = 1
417 :
418 1 : func newIndexBlockBuf(useMutex bool) *indexBlockBuf {
419 1 : i := indexBlockBufPool.Get().(*indexBlockBuf)
420 1 : i.size.useMutex = useMutex
421 1 : i.restartInterval = indexBlockRestartInterval
422 1 : i.block.restartInterval = indexBlockRestartInterval
423 1 : i.size.estimate.init(emptyBlockSize)
424 1 : return i
425 1 : }
426 :
427 : func (i *indexBlockBuf) shouldFlush(
428 : sep InternalKey, valueLen, targetBlockSize, sizeThreshold int,
429 1 : ) bool {
430 1 : if i.size.useMutex {
431 1 : i.size.mu.Lock()
432 1 : defer i.size.mu.Unlock()
433 1 : }
434 :
435 1 : nEntries := i.size.estimate.numTotalEntries()
436 1 : return shouldFlush(
437 1 : sep, valueLen, i.restartInterval, int(i.size.estimate.size()),
438 1 : int(nEntries), targetBlockSize, sizeThreshold)
439 : }
440 :
441 1 : func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) {
442 1 : i.block.add(key, value)
443 1 : size := i.block.estimatedSize()
444 1 : if i.size.useMutex {
445 1 : i.size.mu.Lock()
446 1 : defer i.size.mu.Unlock()
447 1 : }
448 1 : i.size.estimate.writtenWithTotal(uint64(size), inflightSize)
449 : }
450 :
451 1 : func (i *indexBlockBuf) finish() []byte {
452 1 : b := i.block.finish()
453 1 : return b
454 1 : }
455 :
456 1 : func (i *indexBlockBuf) addInflight(inflightSize int) {
457 1 : if i.size.useMutex {
458 1 : i.size.mu.Lock()
459 1 : defer i.size.mu.Unlock()
460 1 : }
461 1 : i.size.estimate.addInflight(inflightSize)
462 : }
463 :
464 1 : func (i *indexBlockBuf) estimatedSize() uint64 {
465 1 : if i.size.useMutex {
466 1 : i.size.mu.Lock()
467 1 : defer i.size.mu.Unlock()
468 1 : }
469 :
470 : // Make sure that the size estimation works as expected when parallelism
471 : // is disabled.
472 1 : if invariants.Enabled && !i.size.useMutex {
473 1 : if i.size.estimate.inflightSize != 0 {
474 0 : panic("unexpected inflight entry in index block size estimation")
475 : }
476 :
477 : // NB: The i.block should only be accessed from the writeQueue goroutine,
478 : // when parallelism is enabled. We break that invariant here, but that's
479 : // okay since parallelism is disabled.
480 1 : if i.size.estimate.size() != uint64(i.block.estimatedSize()) {
481 0 : panic("index block size estimation sans parallelism is incorrect")
482 : }
483 : }
484 1 : return i.size.estimate.size()
485 : }
486 :
487 : // sizeEstimate is used for sstable size estimation. sizeEstimate can be
488 : // accessed by the Writer client and compressionQueue goroutines. Fields
489 : // should only be read/updated through the functions defined on the
490 : // *sizeEstimate type.
491 : type dataBlockEstimates struct {
492 : // If we don't do block compression in parallel, then we don't need to take
493 : // the performance hit of synchronizing using this mutex.
494 : useMutex bool
495 : mu sync.Mutex
496 :
497 : estimate sizeEstimate
498 : }
499 :
500 : // inflightSize is the uncompressed block size estimate which has been
501 : // previously provided to addInflightDataBlock(). If addInflightDataBlock()
502 : // has not been called, this must be set to 0. compressedSize is the
503 : // compressed size of the block.
504 1 : func (d *dataBlockEstimates) dataBlockCompressed(compressedSize int, inflightSize int) {
505 1 : if d.useMutex {
506 0 : d.mu.Lock()
507 0 : defer d.mu.Unlock()
508 0 : }
509 1 : d.estimate.writtenWithDelta(compressedSize+blockTrailerLen, inflightSize)
510 : }
511 :
512 : // size is an estimated size of datablock data which has been written to disk.
513 1 : func (d *dataBlockEstimates) size() uint64 {
514 1 : if d.useMutex {
515 0 : d.mu.Lock()
516 0 : defer d.mu.Unlock()
517 0 : }
518 : // If there is no parallel compression, there should not be any inflight bytes.
519 1 : if invariants.Enabled && !d.useMutex {
520 1 : if d.estimate.inflightSize != 0 {
521 0 : panic("unexpected inflight entry in data block size estimation")
522 : }
523 : }
524 1 : return d.estimate.size()
525 : }
526 :
527 : // Avoid linter unused error.
528 : var _ = (&dataBlockEstimates{}).addInflightDataBlock
529 :
530 : // NB: unused since no parallel compression.
531 0 : func (d *dataBlockEstimates) addInflightDataBlock(size int) {
532 0 : if d.useMutex {
533 0 : d.mu.Lock()
534 0 : defer d.mu.Unlock()
535 0 : }
536 :
537 0 : d.estimate.addInflight(size)
538 : }
539 :
540 : var writeTaskPool = sync.Pool{
541 1 : New: func() interface{} {
542 1 : t := &writeTask{}
543 1 : t.compressionDone = make(chan bool, 1)
544 1 : return t
545 1 : },
546 : }
547 :
548 : type checksummer struct {
549 : checksumType ChecksumType
550 : xxHasher *xxhash.Digest
551 : }
552 :
553 1 : func (c *checksummer) checksum(block []byte, blockType []byte) (checksum uint32) {
554 1 : // Calculate the checksum.
555 1 : switch c.checksumType {
556 1 : case ChecksumTypeCRC32c:
557 1 : checksum = crc.New(block).Update(blockType).Value()
558 1 : case ChecksumTypeXXHash64:
559 1 : if c.xxHasher == nil {
560 1 : c.xxHasher = xxhash.New()
561 1 : } else {
562 1 : c.xxHasher.Reset()
563 1 : }
564 1 : c.xxHasher.Write(block)
565 1 : c.xxHasher.Write(blockType)
566 1 : checksum = uint32(c.xxHasher.Sum64())
567 0 : default:
568 0 : panic(errors.Newf("unsupported checksum type: %d", c.checksumType))
569 : }
570 1 : return checksum
571 : }
572 :
573 : type blockBuf struct {
574 : // tmp is a scratch buffer, large enough to hold either footerLen bytes,
575 : // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most
576 : // likely large enough for a block handle with properties.
577 : tmp [blockHandleLikelyMaxLen]byte
578 : // compressedBuf is the destination buffer for compression. It is re-used over the
579 : // lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block.
580 : compressedBuf []byte
581 : checksummer checksummer
582 : }
583 :
584 1 : func (b *blockBuf) clear() {
585 1 : // We can't assign b.compressedBuf[:0] to compressedBuf because snappy relies
586 1 : // on the length of the buffer, and not the capacity to determine if it needs
587 1 : // to make an allocation.
588 1 : *b = blockBuf{
589 1 : compressedBuf: b.compressedBuf, checksummer: b.checksummer,
590 1 : }
591 1 : }
592 :
593 : // A dataBlockBuf holds all the state required to compress and write a data block to disk.
594 : // A dataBlockBuf begins its lifecycle owned by the Writer client goroutine. The Writer
595 : // client goroutine adds keys to the sstable, writing directly into a dataBlockBuf's blockWriter
596 : // until the block is full. Once a dataBlockBuf's block is full, the dataBlockBuf may be passed
597 : // to other goroutines for compression and file I/O.
598 : type dataBlockBuf struct {
599 : blockBuf
600 : dataBlock blockWriter
601 :
602 : // uncompressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
603 : // next byte slice to be compressed. The uncompressed byte slice will be backed by the
604 : // dataBlock.buf.
605 : uncompressed []byte
606 : // compressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
607 : // compressed byte slice which must be written to disk. The compressed byte slice may be
608 : // backed by the dataBlock.buf, or the dataBlockBuf.compressedBuf, depending on whether
609 : // we use the result of the compression.
610 : compressed []byte
611 :
612 : // We're making calls to BlockPropertyCollectors from the Writer client goroutine. We need to
613 : // pass the encoded block properties over to the write queue. To prevent copies, and allocations,
614 : // we give each dataBlockBuf, a blockPropertiesEncoder.
615 : blockPropsEncoder blockPropertiesEncoder
616 : // dataBlockProps is set when Writer.finishDataBlockProps is called. The dataBlockProps slice is
617 : // a shallow copy of the internal buffer of the dataBlockBuf.blockPropsEncoder.
618 : dataBlockProps []byte
619 :
620 : // sepScratch is reusable scratch space for computing separator keys.
621 : sepScratch []byte
622 : }
623 :
624 1 : func (d *dataBlockBuf) clear() {
625 1 : d.blockBuf.clear()
626 1 : d.dataBlock.clear()
627 1 :
628 1 : d.uncompressed = nil
629 1 : d.compressed = nil
630 1 : d.dataBlockProps = nil
631 1 : d.sepScratch = d.sepScratch[:0]
632 1 : }
633 :
634 : var dataBlockBufPool = sync.Pool{
635 1 : New: func() interface{} {
636 1 : return &dataBlockBuf{}
637 1 : },
638 : }
639 :
640 1 : func newDataBlockBuf(restartInterval int, checksumType ChecksumType) *dataBlockBuf {
641 1 : d := dataBlockBufPool.Get().(*dataBlockBuf)
642 1 : d.dataBlock.restartInterval = restartInterval
643 1 : d.checksummer.checksumType = checksumType
644 1 : return d
645 1 : }
646 :
647 1 : func (d *dataBlockBuf) finish() {
648 1 : d.uncompressed = d.dataBlock.finish()
649 1 : }
650 :
651 1 : func (d *dataBlockBuf) compressAndChecksum(c Compression) {
652 1 : d.compressed = compressAndChecksum(d.uncompressed, c, &d.blockBuf)
653 1 : }
654 :
655 : func (d *dataBlockBuf) shouldFlush(
656 : key InternalKey, valueLen, targetBlockSize, sizeThreshold int,
657 1 : ) bool {
658 1 : return shouldFlush(
659 1 : key, valueLen, d.dataBlock.restartInterval, d.dataBlock.estimatedSize(),
660 1 : d.dataBlock.nEntries, targetBlockSize, sizeThreshold)
661 1 : }
662 :
663 : type indexBlockAndBlockProperties struct {
664 : nEntries int
665 : // sep is the last key added to this block, for computing a separator later.
666 : sep InternalKey
667 : properties []byte
668 : // block is the encoded block produced by blockWriter.finish.
669 : block []byte
670 : }
671 :
672 : // Set sets the value for the given key. The sequence number is set to 0.
673 : // Intended for use to externally construct an sstable before ingestion into a
674 : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
675 : // order.
676 : //
677 : // TODO(peter): untested
678 1 : func (w *Writer) Set(key, value []byte) error {
679 1 : if w.err != nil {
680 0 : return w.err
681 0 : }
682 1 : if w.isStrictObsolete {
683 0 : return errors.Errorf("use AddWithForceObsolete")
684 0 : }
685 : // forceObsolete is false based on the assumption that no RANGEDELs in the
686 : // sstable delete the added points.
687 1 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
688 : }
689 :
690 : // Delete deletes the value for the given key. The sequence number is set to
691 : // 0. Intended for use to externally construct an sstable before ingestion into
692 : // a DB.
693 : //
694 : // TODO(peter): untested
695 1 : func (w *Writer) Delete(key []byte) error {
696 1 : if w.err != nil {
697 0 : return w.err
698 0 : }
699 1 : if w.isStrictObsolete {
700 0 : return errors.Errorf("use AddWithForceObsolete")
701 0 : }
702 : // forceObsolete is false based on the assumption that no RANGEDELs in the
703 : // sstable delete the added points.
704 1 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
705 : }
706 :
707 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
708 : // (inclusive on start, exclusive on end). The sequence number is set to
709 : // 0. Intended for use to externally construct an sstable before ingestion into
710 : // a DB.
711 : //
712 : // TODO(peter): untested
713 1 : func (w *Writer) DeleteRange(start, end []byte) error {
714 1 : if w.err != nil {
715 0 : return w.err
716 0 : }
717 1 : return w.addTombstone(base.MakeInternalKey(start, 0, InternalKeyKindRangeDelete), end)
718 : }
719 :
720 : // Merge adds an action to the DB that merges the value at key with the new
721 : // value. The details of the merge are dependent upon the configured merge
722 : // operator. The sequence number is set to 0. Intended for use to externally
723 : // construct an sstable before ingestion into a DB.
724 : //
725 : // TODO(peter): untested
726 0 : func (w *Writer) Merge(key, value []byte) error {
727 0 : if w.err != nil {
728 0 : return w.err
729 0 : }
730 0 : if w.isStrictObsolete {
731 0 : return errors.Errorf("use AddWithForceObsolete")
732 0 : }
733 : // forceObsolete is false based on the assumption that no RANGEDELs in the
734 : // sstable that delete the added points. If the user configured this writer
735 : // to be strict-obsolete, addPoint will reject the addition of this MERGE.
736 0 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
737 : }
738 :
739 : // Add adds a key/value pair to the table being written. For a given Writer,
740 : // the keys passed to Add must be in increasing order. The exception to this
741 : // rule is range deletion tombstones. Range deletion tombstones need to be
742 : // added ordered by their start key, but they can be added out of order from
743 : // point entries. Additionally, range deletion tombstones must be fragmented
744 : // (i.e. by keyspan.Fragmenter).
745 1 : func (w *Writer) Add(key InternalKey, value []byte) error {
746 1 : if w.isStrictObsolete {
747 0 : return errors.Errorf("use AddWithForceObsolete")
748 0 : }
749 1 : return w.AddWithForceObsolete(key, value, false)
750 : }
751 :
752 : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
753 : //
754 : // forceObsolete indicates whether the caller has determined that this key is
755 : // obsolete even though it may be the latest point key for this userkey. This
756 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
757 : // strict-obsolete sstables.
758 : //
759 : // Note that there are two properties, S1 and S2 (see comment in format.go)
760 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
761 : // responsibility of the caller. S1 is solely the responsibility of the
762 : // callee.
763 1 : func (w *Writer) AddWithForceObsolete(key InternalKey, value []byte, forceObsolete bool) error {
764 1 : if w.err != nil {
765 0 : return w.err
766 0 : }
767 :
768 1 : switch key.Kind() {
769 1 : case InternalKeyKindRangeDelete:
770 1 : return w.addTombstone(key, value)
771 : case base.InternalKeyKindRangeKeyDelete,
772 : base.InternalKeyKindRangeKeySet,
773 0 : base.InternalKeyKindRangeKeyUnset:
774 0 : w.err = errors.Errorf(
775 0 : "pebble: range keys must be added via one of the RangeKey* functions")
776 0 : return w.err
777 : }
778 1 : return w.addPoint(key, value, forceObsolete)
779 : }
780 :
781 1 : func (w *Writer) makeAddPointDecisionV2(key InternalKey) error {
782 1 : prevTrailer := w.lastPointKeyInfo.trailer
783 1 : w.lastPointKeyInfo.trailer = key.Trailer
784 1 : if w.dataBlockBuf.dataBlock.nEntries == 0 {
785 1 : return nil
786 1 : }
787 1 : if !w.disableKeyOrderChecks {
788 1 : prevPointUserKey := w.dataBlockBuf.dataBlock.getCurUserKey()
789 1 : cmpUser := w.compare(prevPointUserKey, key.UserKey)
790 1 : if cmpUser > 0 || (cmpUser == 0 && prevTrailer <= key.Trailer) {
791 1 : return errors.Errorf(
792 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
793 1 : InternalKey{UserKey: prevPointUserKey, Trailer: prevTrailer}.Pretty(w.formatKey),
794 1 : key.Pretty(w.formatKey))
795 1 : }
796 : }
797 1 : return nil
798 : }
799 :
800 : // REQUIRES: at least one point has been written to the Writer.
801 1 : func (w *Writer) getLastPointUserKey() []byte {
802 1 : if w.dataBlockBuf.dataBlock.nEntries == 0 {
803 0 : panic(errors.AssertionFailedf("no point keys added to writer"))
804 : }
805 1 : return w.dataBlockBuf.dataBlock.getCurUserKey()
806 : }
807 :
808 : // REQUIRES: w.tableFormat >= TableFormatPebblev3
809 : func (w *Writer) makeAddPointDecisionV3(
810 : key InternalKey, valueLen int,
811 1 : ) (setHasSamePrefix bool, writeToValueBlock bool, isObsolete bool, err error) {
812 1 : prevPointKeyInfo := w.lastPointKeyInfo
813 1 : w.lastPointKeyInfo.userKeyLen = len(key.UserKey)
814 1 : w.lastPointKeyInfo.prefixLen = w.split(key.UserKey)
815 1 : w.lastPointKeyInfo.trailer = key.Trailer
816 1 : w.lastPointKeyInfo.isObsolete = false
817 1 : if !w.meta.HasPointKeys {
818 1 : return false, false, false, nil
819 1 : }
820 1 : keyKind := base.TrailerKind(key.Trailer)
821 1 : prevPointUserKey := w.getLastPointUserKey()
822 1 : prevPointKey := InternalKey{UserKey: prevPointUserKey, Trailer: prevPointKeyInfo.trailer}
823 1 : prevKeyKind := base.TrailerKind(prevPointKeyInfo.trailer)
824 1 : considerWriteToValueBlock := prevKeyKind == InternalKeyKindSet &&
825 1 : keyKind == InternalKeyKindSet
826 1 : if considerWriteToValueBlock && !w.requiredInPlaceValueBound.IsEmpty() {
827 1 : keyPrefix := key.UserKey[:w.lastPointKeyInfo.prefixLen]
828 1 : cmpUpper := w.compare(
829 1 : w.requiredInPlaceValueBound.Upper, keyPrefix)
830 1 : if cmpUpper <= 0 {
831 1 : // Common case for CockroachDB. Make it empty since all future keys in
832 1 : // this sstable will also have cmpUpper <= 0.
833 1 : w.requiredInPlaceValueBound = UserKeyPrefixBound{}
834 1 : } else if w.compare(keyPrefix, w.requiredInPlaceValueBound.Lower) >= 0 {
835 1 : considerWriteToValueBlock = false
836 1 : }
837 : }
838 : // cmpPrefix is initialized iff considerWriteToValueBlock.
839 1 : var cmpPrefix int
840 1 : var cmpUser int
841 1 : if considerWriteToValueBlock {
842 1 : // Compare the prefixes.
843 1 : cmpPrefix = w.compare(prevPointUserKey[:prevPointKeyInfo.prefixLen],
844 1 : key.UserKey[:w.lastPointKeyInfo.prefixLen])
845 1 : cmpUser = cmpPrefix
846 1 : if cmpPrefix == 0 {
847 1 : // Need to compare suffixes to compute cmpUser.
848 1 : cmpUser = w.compare(prevPointUserKey[prevPointKeyInfo.prefixLen:],
849 1 : key.UserKey[w.lastPointKeyInfo.prefixLen:])
850 1 : }
851 1 : } else {
852 1 : cmpUser = w.compare(prevPointUserKey, key.UserKey)
853 1 : }
854 : // Ensure that no one adds a point key kind without considering the obsolete
855 : // handling for that kind.
856 1 : switch keyKind {
857 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
858 1 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
859 0 : default:
860 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
861 : }
862 : // If same user key, then the current key is obsolete if any of the
863 : // following is true:
864 : // C1 The prev key was obsolete.
865 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
866 : // preserve SET* and MERGE since their values will be merged into the
867 : // previous key. We also must preserve DEL* since there may be an older
868 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
869 : // if we omit the DEL* that lower SET*/MERGE will become visible.
870 : //
871 : // Regardless of whether it is the same user key or not
872 : // C3 The current key is some kind of point delete, and we are writing to
873 : // the lowest level, then it is also obsolete. The correctness of this
874 : // relies on the same user key not spanning multiple sstables in a level.
875 : //
876 : // C1 ensures that for a user key there is at most one transition from
877 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
878 : // are not obsolete. We consider the various value of n:
879 : //
880 : // n = 0: This happens due to forceObsolete being set by the caller, or due
881 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
882 : // must also delete all the lower seqnums for the same user key. C3 triggers
883 : // due to a point delete and that deletes all the lower seqnums for the same
884 : // user key.
885 : //
886 : // n = 1: This is the common case. It happens when the first key is not a
887 : // MERGE, or the current key is some kind of point delete.
888 : //
889 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
890 : // single non-MERGE key.
891 1 : isObsoleteC1AndC2 := cmpUser == 0 &&
892 1 : (prevPointKeyInfo.isObsolete || prevKeyKind != InternalKeyKindMerge)
893 1 : isObsoleteC3 := w.writingToLowestLevel &&
894 1 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
895 1 : keyKind == InternalKeyKindDeleteSized)
896 1 : isObsolete = isObsoleteC1AndC2 || isObsoleteC3
897 1 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
898 1 : // possible, but requires some care in documenting and checking invariants.
899 1 : // There is code that assumes nothing in value blocks because of single MVCC
900 1 : // version (those should be ok). We have to ensure setHasSamePrefix is
901 1 : // correctly initialized here etc.
902 1 :
903 1 : if !w.disableKeyOrderChecks &&
904 1 : (cmpUser > 0 || (cmpUser == 0 && prevPointKeyInfo.trailer <= key.Trailer)) {
905 1 : return false, false, false, errors.Errorf(
906 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
907 1 : prevPointKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
908 1 : }
909 1 : if !considerWriteToValueBlock {
910 1 : return false, false, isObsolete, nil
911 1 : }
912 : // NB: it is possible that cmpUser == 0, i.e., these two SETs have identical
913 : // user keys (because of an open snapshot). This should be the rare case.
914 1 : setHasSamePrefix = cmpPrefix == 0
915 1 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
916 1 : // valueHandle, this should be > 3. But tiny values are common in test and
917 1 : // unlikely in production, so we use 0 here for better test coverage.
918 1 : const tinyValueThreshold = 0
919 1 : // NB: setting WriterOptions.DisableValueBlocks does not disable the
920 1 : // setHasSamePrefix optimization.
921 1 : considerWriteToValueBlock = setHasSamePrefix && valueLen > tinyValueThreshold && w.valueBlockWriter != nil
922 1 : return setHasSamePrefix, considerWriteToValueBlock, isObsolete, nil
923 : }
924 :
925 1 : func (w *Writer) addPoint(key InternalKey, value []byte, forceObsolete bool) error {
926 1 : if w.isStrictObsolete && key.Kind() == InternalKeyKindMerge {
927 1 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
928 1 : }
929 1 : var err error
930 1 : var setHasSameKeyPrefix, writeToValueBlock, addPrefixToValueStoredWithKey bool
931 1 : var isObsolete bool
932 1 : maxSharedKeyLen := len(key.UserKey)
933 1 : if w.tableFormat >= TableFormatPebblev3 {
934 1 : // maxSharedKeyLen is limited to the prefix of the preceding key. If the
935 1 : // preceding key was in a different block, then the blockWriter will
936 1 : // ignore this maxSharedKeyLen.
937 1 : maxSharedKeyLen = w.lastPointKeyInfo.prefixLen
938 1 : setHasSameKeyPrefix, writeToValueBlock, isObsolete, err =
939 1 : w.makeAddPointDecisionV3(key, len(value))
940 1 : addPrefixToValueStoredWithKey = base.TrailerKind(key.Trailer) == InternalKeyKindSet
941 1 : } else {
942 1 : err = w.makeAddPointDecisionV2(key)
943 1 : }
944 1 : if err != nil {
945 1 : return err
946 1 : }
947 1 : isObsolete = w.tableFormat >= TableFormatPebblev4 && (isObsolete || forceObsolete)
948 1 : w.lastPointKeyInfo.isObsolete = isObsolete
949 1 : var valueStoredWithKey []byte
950 1 : var prefix valuePrefix
951 1 : var valueStoredWithKeyLen int
952 1 : if writeToValueBlock {
953 1 : vh, err := w.valueBlockWriter.addValue(value)
954 1 : if err != nil {
955 0 : return err
956 0 : }
957 1 : n := encodeValueHandle(w.blockBuf.tmp[:], vh)
958 1 : valueStoredWithKey = w.blockBuf.tmp[:n]
959 1 : valueStoredWithKeyLen = len(valueStoredWithKey) + 1
960 1 : var attribute base.ShortAttribute
961 1 : if w.shortAttributeExtractor != nil {
962 1 : // TODO(sumeer): for compactions, it is possible that the input sstable
963 1 : // already has this value in the value section and so we have already
964 1 : // extracted the ShortAttribute. Avoid extracting it again. This will
965 1 : // require changing the Writer.Add interface.
966 1 : if attribute, err = w.shortAttributeExtractor(
967 1 : key.UserKey, w.lastPointKeyInfo.prefixLen, value); err != nil {
968 0 : return err
969 0 : }
970 : }
971 1 : prefix = makePrefixForValueHandle(setHasSameKeyPrefix, attribute)
972 1 : } else {
973 1 : valueStoredWithKey = value
974 1 : valueStoredWithKeyLen = len(value)
975 1 : if addPrefixToValueStoredWithKey {
976 1 : valueStoredWithKeyLen++
977 1 : }
978 1 : prefix = makePrefixForInPlaceValue(setHasSameKeyPrefix)
979 : }
980 :
981 1 : if err := w.maybeFlush(key, valueStoredWithKeyLen); err != nil {
982 1 : return err
983 1 : }
984 :
985 1 : for i := range w.blockPropCollectors {
986 1 : v := value
987 1 : if addPrefixToValueStoredWithKey {
988 1 : // Values for SET are not required to be in-place, and in the future may
989 1 : // not even be read by the compaction, so pass nil values. Block
990 1 : // property collectors in such Pebble DB's must not look at the value.
991 1 : v = nil
992 1 : }
993 1 : if err := w.blockPropCollectors[i].Add(key, v); err != nil {
994 1 : w.err = err
995 1 : return err
996 1 : }
997 : }
998 1 : if w.tableFormat >= TableFormatPebblev4 {
999 1 : w.obsoleteCollector.AddPoint(isObsolete)
1000 1 : }
1001 :
1002 1 : w.maybeAddToFilter(key.UserKey)
1003 1 : w.dataBlockBuf.dataBlock.addWithOptionalValuePrefix(
1004 1 : key, isObsolete, valueStoredWithKey, maxSharedKeyLen, addPrefixToValueStoredWithKey, prefix,
1005 1 : setHasSameKeyPrefix)
1006 1 :
1007 1 : w.meta.updateSeqNum(key.SeqNum())
1008 1 :
1009 1 : if !w.meta.HasPointKeys {
1010 1 : k := w.dataBlockBuf.dataBlock.getCurKey()
1011 1 : // NB: We need to ensure that SmallestPoint.UserKey is set, so we create
1012 1 : // an InternalKey which is semantically identical to the key, but won't
1013 1 : // have a nil UserKey. We do this, because key.UserKey could be nil, and
1014 1 : // we don't want SmallestPoint.UserKey to be nil.
1015 1 : //
1016 1 : // todo(bananabrick): Determine if it's okay to have a nil SmallestPoint
1017 1 : // .UserKey now that we don't rely on a nil UserKey to determine if the
1018 1 : // key has been set or not.
1019 1 : w.meta.SetSmallestPointKey(k.Clone())
1020 1 : }
1021 :
1022 1 : w.props.NumEntries++
1023 1 : switch key.Kind() {
1024 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
1025 1 : w.props.NumDeletions++
1026 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
1027 1 : case InternalKeyKindDeleteSized:
1028 1 : var size uint64
1029 1 : if len(value) > 0 {
1030 1 : var n int
1031 1 : size, n = binary.Uvarint(value)
1032 1 : if n <= 0 {
1033 0 : w.err = errors.Newf("%s key's value (%x) does not parse as uvarint",
1034 0 : errors.Safe(key.Kind().String()), value)
1035 0 : return w.err
1036 0 : }
1037 : }
1038 1 : w.props.NumDeletions++
1039 1 : w.props.NumSizedDeletions++
1040 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
1041 1 : w.props.RawPointTombstoneValueSize += size
1042 1 : case InternalKeyKindMerge:
1043 1 : w.props.NumMergeOperands++
1044 : }
1045 1 : w.props.RawKeySize += uint64(key.Size())
1046 1 : w.props.RawValueSize += uint64(len(value))
1047 1 : return nil
1048 : }
1049 :
1050 1 : func (w *Writer) prettyTombstone(k InternalKey, value []byte) fmt.Formatter {
1051 1 : return keyspan.Span{
1052 1 : Start: k.UserKey,
1053 1 : End: value,
1054 1 : Keys: []keyspan.Key{{Trailer: k.Trailer}},
1055 1 : }.Pretty(w.formatKey)
1056 1 : }
1057 :
1058 1 : func (w *Writer) addTombstone(key InternalKey, value []byte) error {
1059 1 : if !w.disableKeyOrderChecks && !w.rangeDelV1Format && w.rangeDelBlock.nEntries > 0 {
1060 1 : // Check that tombstones are being added in fragmented order. If the two
1061 1 : // tombstones overlap, their start and end keys must be identical.
1062 1 : prevKey := w.rangeDelBlock.getCurKey()
1063 1 : switch c := w.compare(prevKey.UserKey, key.UserKey); {
1064 0 : case c > 0:
1065 0 : w.err = errors.Errorf("pebble: keys must be added in order: %s, %s",
1066 0 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1067 0 : return w.err
1068 1 : case c == 0:
1069 1 : prevValue := w.rangeDelBlock.curValue
1070 1 : if w.compare(prevValue, value) != 0 {
1071 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
1072 1 : w.prettyTombstone(prevKey, prevValue),
1073 1 : w.prettyTombstone(key, value))
1074 1 : return w.err
1075 1 : }
1076 1 : if prevKey.SeqNum() <= key.SeqNum() {
1077 1 : w.err = errors.Errorf("pebble: keys must be added in strictly increasing order: %s, %s",
1078 1 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1079 1 : return w.err
1080 1 : }
1081 1 : default:
1082 1 : prevValue := w.rangeDelBlock.curValue
1083 1 : if w.compare(prevValue, key.UserKey) > 0 {
1084 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
1085 1 : w.prettyTombstone(prevKey, prevValue),
1086 1 : w.prettyTombstone(key, value))
1087 1 : return w.err
1088 1 : }
1089 : }
1090 : }
1091 :
1092 1 : if key.Trailer == InternalKeyRangeDeleteSentinel {
1093 0 : w.err = errors.Errorf("pebble: cannot add range delete sentinel: %s", key.Pretty(w.formatKey))
1094 0 : return w.err
1095 0 : }
1096 :
1097 1 : w.meta.updateSeqNum(key.SeqNum())
1098 1 :
1099 1 : switch {
1100 1 : case w.rangeDelV1Format:
1101 1 : // Range tombstones are not fragmented in the v1 (i.e. RocksDB) range
1102 1 : // deletion block format, so we need to track the largest range tombstone
1103 1 : // end key as every range tombstone is added.
1104 1 : //
1105 1 : // Note that writing the v1 format is only supported for tests.
1106 1 : if w.props.NumRangeDeletions == 0 {
1107 1 : w.meta.SetSmallestRangeDelKey(key.Clone())
1108 1 : w.meta.SetLargestRangeDelKey(base.MakeRangeDeleteSentinelKey(value).Clone())
1109 1 : } else {
1110 1 : if base.InternalCompare(w.compare, w.meta.SmallestRangeDel, key) > 0 {
1111 1 : w.meta.SetSmallestRangeDelKey(key.Clone())
1112 1 : }
1113 1 : end := base.MakeRangeDeleteSentinelKey(value)
1114 1 : if base.InternalCompare(w.compare, w.meta.LargestRangeDel, end) < 0 {
1115 1 : w.meta.SetLargestRangeDelKey(end.Clone())
1116 1 : }
1117 : }
1118 :
1119 1 : default:
1120 1 : // Range tombstones are fragmented in the v2 range deletion block format,
1121 1 : // so the start key of the first range tombstone added will be the smallest
1122 1 : // range tombstone key. The largest range tombstone key will be determined
1123 1 : // in Writer.Close() as the end key of the last range tombstone added.
1124 1 : if w.props.NumRangeDeletions == 0 {
1125 1 : w.meta.SetSmallestRangeDelKey(key.Clone())
1126 1 : }
1127 : }
1128 :
1129 1 : w.props.NumEntries++
1130 1 : w.props.NumDeletions++
1131 1 : w.props.NumRangeDeletions++
1132 1 : w.props.RawKeySize += uint64(key.Size())
1133 1 : w.props.RawValueSize += uint64(len(value))
1134 1 : w.rangeDelBlock.add(key, value)
1135 1 : return nil
1136 : }
1137 :
1138 : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
1139 : // the given suffix to the given value. The resulting range key is given the
1140 : // sequence number zero, with the expectation that the resulting sstable will be
1141 : // ingested.
1142 : //
1143 : // Keys must be added to the table in increasing order of start key. Spans are
1144 : // not required to be fragmented. The same suffix may not be set or unset twice
1145 : // over the same keyspan, because it would result in inconsistent state. Both
1146 : // the Set and Unset would share the zero sequence number, and a key cannot be
1147 : // both simultaneously set and unset.
1148 1 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
1149 1 : return w.addRangeKeySpan(keyspan.Span{
1150 1 : Start: w.tempRangeKeyCopy(start),
1151 1 : End: w.tempRangeKeyCopy(end),
1152 1 : Keys: []keyspan.Key{
1153 1 : {
1154 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
1155 1 : Suffix: w.tempRangeKeyCopy(suffix),
1156 1 : Value: w.tempRangeKeyCopy(value),
1157 1 : },
1158 1 : },
1159 1 : })
1160 1 : }
1161 :
1162 : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
1163 : // with the given suffix. The resulting range key is given the
1164 : // sequence number zero, with the expectation that the resulting sstable will be
1165 : // ingested.
1166 : //
1167 : // Keys must be added to the table in increasing order of start key. Spans are
1168 : // not required to be fragmented. The same suffix may not be set or unset twice
1169 : // over the same keyspan, because it would result in inconsistent state. Both
1170 : // the Set and Unset would share the zero sequence number, and a key cannot be
1171 : // both simultaneously set and unset.
1172 1 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
1173 1 : return w.addRangeKeySpan(keyspan.Span{
1174 1 : Start: w.tempRangeKeyCopy(start),
1175 1 : End: w.tempRangeKeyCopy(end),
1176 1 : Keys: []keyspan.Key{
1177 1 : {
1178 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
1179 1 : Suffix: w.tempRangeKeyCopy(suffix),
1180 1 : },
1181 1 : },
1182 1 : })
1183 1 : }
1184 :
1185 : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
1186 : //
1187 : // Keys must be added to the table in increasing order of start key. Spans are
1188 : // not required to be fragmented.
1189 1 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
1190 1 : return w.addRangeKeySpan(keyspan.Span{
1191 1 : Start: w.tempRangeKeyCopy(start),
1192 1 : End: w.tempRangeKeyCopy(end),
1193 1 : Keys: []keyspan.Key{
1194 1 : {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
1195 1 : },
1196 1 : })
1197 1 : }
1198 :
1199 : // AddRangeKey adds a range key set, unset, or delete key/value pair to the
1200 : // table being written.
1201 : //
1202 : // Range keys must be supplied in strictly ascending order of start key (i.e.
1203 : // user key ascending, sequence number descending, and key type descending).
1204 : // Ranges added must also be supplied in fragmented span order - i.e. other than
1205 : // spans that are perfectly aligned (same start and end keys), spans may not
1206 : // overlap. Range keys may be added out of order relative to point keys and
1207 : // range deletions.
1208 1 : func (w *Writer) AddRangeKey(key InternalKey, value []byte) error {
1209 1 : if w.err != nil {
1210 0 : return w.err
1211 0 : }
1212 1 : return w.addRangeKey(key, value)
1213 : }
1214 :
1215 1 : func (w *Writer) addRangeKeySpan(span keyspan.Span) error {
1216 1 : if w.compare(span.Start, span.End) >= 0 {
1217 0 : return errors.Errorf(
1218 0 : "pebble: start key must be strictly less than end key",
1219 0 : )
1220 0 : }
1221 1 : if w.fragmenter.Start() != nil && w.compare(w.fragmenter.Start(), span.Start) > 0 {
1222 1 : return errors.Errorf("pebble: spans must be added in order: %s > %s",
1223 1 : w.formatKey(w.fragmenter.Start()), w.formatKey(span.Start))
1224 1 : }
1225 : // Add this span to the fragmenter.
1226 1 : w.fragmenter.Add(span)
1227 1 : return w.err
1228 : }
1229 :
1230 1 : func (w *Writer) encodeRangeKeySpan(span keyspan.Span) {
1231 1 : // This method is the emit function of the Fragmenter.
1232 1 : //
1233 1 : // NB: The span should only contain range keys and be internally consistent
1234 1 : // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
1235 1 : //
1236 1 : // We use w.rangeKeysBySuffix and w.rangeKeySpan to avoid allocations.
1237 1 :
1238 1 : // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
1239 1 : // we may want to in the future.
1240 1 : w.rangeKeysBySuffix.Cmp = w.compare
1241 1 : w.rangeKeysBySuffix.Keys = span.Keys
1242 1 : sort.Sort(&w.rangeKeysBySuffix)
1243 1 :
1244 1 : w.rangeKeySpan = span
1245 1 : w.rangeKeySpan.Keys = w.rangeKeysBySuffix.Keys
1246 1 : w.err = firstError(w.err, w.rangeKeyEncoder.Encode(&w.rangeKeySpan))
1247 1 : }
1248 :
1249 1 : func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
1250 1 : if !w.disableKeyOrderChecks && w.rangeKeyBlock.nEntries > 0 {
1251 1 : prevStartKey := w.rangeKeyBlock.getCurKey()
1252 1 : prevEndKey, _, ok := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue)
1253 1 : if !ok {
1254 0 : // We panic here as we should have previously decoded and validated this
1255 0 : // key and value when it was first added to the range key block.
1256 0 : panic(errors.Errorf("pebble: invalid end key for span: %s",
1257 0 : prevStartKey.Pretty(w.formatKey)))
1258 : }
1259 :
1260 1 : curStartKey := key
1261 1 : curEndKey, _, ok := rangekey.DecodeEndKey(curStartKey.Kind(), value)
1262 1 : if !ok {
1263 0 : w.err = errors.Errorf("pebble: invalid end key for span: %s",
1264 0 : curStartKey.Pretty(w.formatKey))
1265 0 : return w.err
1266 0 : }
1267 :
1268 : // Start keys must be strictly increasing.
1269 1 : if base.InternalCompare(w.compare, prevStartKey, curStartKey) >= 0 {
1270 1 : w.err = errors.Errorf(
1271 1 : "pebble: range keys starts must be added in increasing order: %s, %s",
1272 1 : prevStartKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1273 1 : return w.err
1274 1 : }
1275 :
1276 : // Start keys are increasing. If the start user keys are equal, the
1277 : // end keys must be equal (i.e. aligned spans).
1278 1 : if w.compare(prevStartKey.UserKey, curStartKey.UserKey) == 0 {
1279 1 : if w.compare(prevEndKey, curEndKey) != 0 {
1280 1 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
1281 1 : prevStartKey.Pretty(w.formatKey),
1282 1 : curStartKey.Pretty(w.formatKey))
1283 1 : return w.err
1284 1 : }
1285 1 : } else if w.compare(prevEndKey, curStartKey.UserKey) > 0 {
1286 1 : // If the start user keys are NOT equal, the spans must be disjoint (i.e.
1287 1 : // no overlap).
1288 1 : // NOTE: the inequality excludes zero, as we allow the end key of the
1289 1 : // lower span be the same as the start key of the upper span, because
1290 1 : // the range end key is considered an exclusive bound.
1291 1 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
1292 1 : prevStartKey.Pretty(w.formatKey),
1293 1 : curStartKey.Pretty(w.formatKey))
1294 1 : return w.err
1295 1 : }
1296 : }
1297 :
1298 : // TODO(travers): Add an invariant-gated check to ensure that suffix-values
1299 : // are sorted within coalesced spans.
1300 :
1301 : // Range-keys and point-keys are intended to live in "parallel" keyspaces.
1302 : // However, we track a single seqnum in the table metadata that spans both of
1303 : // these keyspaces.
1304 : // TODO(travers): Consider tracking range key seqnums separately.
1305 1 : w.meta.updateSeqNum(key.SeqNum())
1306 1 :
1307 1 : // Range tombstones are fragmented, so the start key of the first range key
1308 1 : // added will be the smallest. The largest range key is determined in
1309 1 : // Writer.Close() as the end key of the last range key added to the block.
1310 1 : if w.props.NumRangeKeys() == 0 {
1311 1 : w.meta.SetSmallestRangeKey(key.Clone())
1312 1 : }
1313 :
1314 : // Update block properties.
1315 1 : w.props.RawRangeKeyKeySize += uint64(key.Size())
1316 1 : w.props.RawRangeKeyValueSize += uint64(len(value))
1317 1 : switch key.Kind() {
1318 1 : case base.InternalKeyKindRangeKeyDelete:
1319 1 : w.props.NumRangeKeyDels++
1320 1 : case base.InternalKeyKindRangeKeySet:
1321 1 : w.props.NumRangeKeySets++
1322 1 : case base.InternalKeyKindRangeKeyUnset:
1323 1 : w.props.NumRangeKeyUnsets++
1324 0 : default:
1325 0 : panic(errors.Errorf("pebble: invalid range key type: %s", key.Kind()))
1326 : }
1327 :
1328 1 : for i := range w.blockPropCollectors {
1329 1 : if err := w.blockPropCollectors[i].Add(key, value); err != nil {
1330 0 : return err
1331 0 : }
1332 : }
1333 :
1334 : // Add the key to the block.
1335 1 : w.rangeKeyBlock.add(key, value)
1336 1 : return nil
1337 : }
1338 :
1339 : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
1340 : // slice. Any byte written to the returned slice is retained for the lifetime of
1341 : // the Writer.
1342 1 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
1343 1 : if cap(w.rkBuf)-len(w.rkBuf) < n {
1344 1 : size := len(w.rkBuf) + 2*n
1345 1 : if size < 2*cap(w.rkBuf) {
1346 1 : size = 2 * cap(w.rkBuf)
1347 1 : }
1348 1 : buf := make([]byte, len(w.rkBuf), size)
1349 1 : copy(buf, w.rkBuf)
1350 1 : w.rkBuf = buf
1351 : }
1352 1 : b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
1353 1 : w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
1354 1 : return b
1355 : }
1356 :
1357 : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
1358 : // range key buffer.
1359 1 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
1360 1 : if len(k) == 0 {
1361 1 : return nil
1362 1 : }
1363 1 : buf := w.tempRangeKeyBuf(len(k))
1364 1 : copy(buf, k)
1365 1 : return buf
1366 : }
1367 :
1368 1 : func (w *Writer) maybeAddToFilter(key []byte) {
1369 1 : if w.filter != nil {
1370 1 : prefix := key[:w.split(key)]
1371 1 : w.filter.addKey(prefix)
1372 1 : }
1373 : }
1374 :
1375 1 : func (w *Writer) flush(key InternalKey) error {
1376 1 : // We're finishing a data block.
1377 1 : err := w.finishDataBlockProps(w.dataBlockBuf)
1378 1 : if err != nil {
1379 1 : return err
1380 1 : }
1381 1 : w.dataBlockBuf.finish()
1382 1 : w.dataBlockBuf.compressAndChecksum(w.compression)
1383 1 : // Since dataBlockEstimates.addInflightDataBlock was never called, the
1384 1 : // inflightSize is set to 0.
1385 1 : w.coordination.sizeEstimate.dataBlockCompressed(len(w.dataBlockBuf.compressed), 0)
1386 1 :
1387 1 : // Determine if the index block should be flushed. Since we're accessing the
1388 1 : // dataBlockBuf.dataBlock.curKey here, we have to make sure that once we start
1389 1 : // to pool the dataBlockBufs, the curKey isn't used by the Writer once the
1390 1 : // dataBlockBuf is added back to a sync.Pool. In this particular case, the
1391 1 : // byte slice which supports "sep" will eventually be copied when "sep" is
1392 1 : // added to the index block.
1393 1 : prevKey := w.dataBlockBuf.dataBlock.getCurKey()
1394 1 : sep := w.indexEntrySep(prevKey, key, w.dataBlockBuf)
1395 1 : // We determine that we should flush an index block from the Writer client
1396 1 : // goroutine, but we actually finish the index block from the writeQueue.
1397 1 : // When we determine that an index block should be flushed, we need to call
1398 1 : // BlockPropertyCollector.FinishIndexBlock. But block property collector
1399 1 : // calls must happen sequentially from the Writer client. Therefore, we need
1400 1 : // to determine that we are going to flush the index block from the Writer
1401 1 : // client.
1402 1 : shouldFlushIndexBlock := supportsTwoLevelIndex(w.tableFormat) && w.indexBlock.shouldFlush(
1403 1 : sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold,
1404 1 : )
1405 1 :
1406 1 : var indexProps []byte
1407 1 : var flushableIndexBlock *indexBlockBuf
1408 1 : if shouldFlushIndexBlock {
1409 1 : flushableIndexBlock = w.indexBlock
1410 1 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1411 1 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1412 1 : // flush the index block.
1413 1 : indexProps, err = w.finishIndexBlockProps()
1414 1 : if err != nil {
1415 1 : return err
1416 1 : }
1417 : }
1418 :
1419 : // We've called BlockPropertyCollector.FinishDataBlock, and, if necessary,
1420 : // BlockPropertyCollector.FinishIndexBlock. Since we've decided to finish
1421 : // the data block, we can call
1422 : // BlockPropertyCollector.AddPrevDataBlockToIndexBlock.
1423 1 : w.addPrevDataBlockToIndexBlockProps()
1424 1 :
1425 1 : // Schedule a write.
1426 1 : writeTask := writeTaskPool.Get().(*writeTask)
1427 1 : // We're setting compressionDone to indicate that compression of this block
1428 1 : // has already been completed.
1429 1 : writeTask.compressionDone <- true
1430 1 : writeTask.buf = w.dataBlockBuf
1431 1 : writeTask.indexEntrySep = sep
1432 1 : writeTask.currIndexBlock = w.indexBlock
1433 1 : writeTask.indexInflightSize = sep.Size() + encodedBHPEstimatedSize
1434 1 : writeTask.finishedIndexProps = indexProps
1435 1 : writeTask.flushableIndexBlock = flushableIndexBlock
1436 1 :
1437 1 : // The writeTask corresponds to an unwritten index entry.
1438 1 : w.indexBlock.addInflight(writeTask.indexInflightSize)
1439 1 :
1440 1 : w.dataBlockBuf = nil
1441 1 : if w.coordination.parallelismEnabled {
1442 1 : w.coordination.writeQueue.add(writeTask)
1443 1 : } else {
1444 1 : err = w.coordination.writeQueue.addSync(writeTask)
1445 1 : }
1446 1 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
1447 1 :
1448 1 : return err
1449 : }
1450 :
1451 1 : func (w *Writer) maybeFlush(key InternalKey, valueLen int) error {
1452 1 : if !w.dataBlockBuf.shouldFlush(key, valueLen, w.blockSize, w.blockSizeThreshold) {
1453 1 : return nil
1454 1 : }
1455 :
1456 1 : err := w.flush(key)
1457 1 :
1458 1 : if err != nil {
1459 1 : w.err = err
1460 1 : return err
1461 1 : }
1462 :
1463 1 : return nil
1464 : }
1465 :
1466 : // dataBlockBuf.dataBlockProps set by this method must be encoded before any future use of the
1467 : // dataBlockBuf.blockPropsEncoder, since the properties slice will get reused by the
1468 : // blockPropsEncoder.
1469 1 : func (w *Writer) finishDataBlockProps(buf *dataBlockBuf) error {
1470 1 : if len(w.blockPropCollectors) == 0 {
1471 1 : return nil
1472 1 : }
1473 1 : var err error
1474 1 : buf.blockPropsEncoder.resetProps()
1475 1 : for i := range w.blockPropCollectors {
1476 1 : scratch := buf.blockPropsEncoder.getScratchForProp()
1477 1 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
1478 1 : return err
1479 1 : }
1480 1 : if len(scratch) > 0 {
1481 1 : buf.blockPropsEncoder.addProp(shortID(i), scratch)
1482 1 : }
1483 : }
1484 :
1485 1 : buf.dataBlockProps = buf.blockPropsEncoder.unsafeProps()
1486 1 : return nil
1487 : }
1488 :
1489 : // The BlockHandleWithProperties returned by this method must be encoded before any future use of
1490 : // the Writer.blockPropsEncoder, since the properties slice will get reused by the blockPropsEncoder.
1491 : // maybeAddBlockPropertiesToBlockHandle should only be called if block is being written synchronously
1492 : // with the Writer client.
1493 : func (w *Writer) maybeAddBlockPropertiesToBlockHandle(
1494 : bh BlockHandle,
1495 1 : ) (BlockHandleWithProperties, error) {
1496 1 : err := w.finishDataBlockProps(w.dataBlockBuf)
1497 1 : if err != nil {
1498 0 : return BlockHandleWithProperties{}, err
1499 0 : }
1500 1 : return BlockHandleWithProperties{BlockHandle: bh, Props: w.dataBlockBuf.dataBlockProps}, nil
1501 : }
1502 :
1503 1 : func (w *Writer) indexEntrySep(prevKey, key InternalKey, dataBlockBuf *dataBlockBuf) InternalKey {
1504 1 : // Make a rough guess that we want key-sized scratch to compute the separator.
1505 1 : if cap(dataBlockBuf.sepScratch) < key.Size() {
1506 1 : dataBlockBuf.sepScratch = make([]byte, 0, key.Size()*2)
1507 1 : }
1508 :
1509 1 : var sep InternalKey
1510 1 : if key.UserKey == nil && key.Trailer == 0 {
1511 1 : sep = prevKey.Successor(w.compare, w.successor, dataBlockBuf.sepScratch[:0])
1512 1 : } else {
1513 1 : sep = prevKey.Separator(w.compare, w.separator, dataBlockBuf.sepScratch[:0], key)
1514 1 : }
1515 1 : return sep
1516 : }
1517 :
1518 : // addIndexEntry adds an index entry for the specified key and block handle.
1519 : // addIndexEntry can be called from both the Writer client goroutine, and the
1520 : // writeQueue goroutine. If the flushIndexBuf != nil, then the indexProps, as
1521 : // they're used when the index block is finished.
1522 : //
1523 : // Invariant:
1524 : // 1. addIndexEntry must not store references to the sep InternalKey, the tmp
1525 : // byte slice, bhp.Props. That is, these must be either deep copied or
1526 : // encoded.
1527 : // 2. addIndexEntry must not hold references to the flushIndexBuf, and the writeTo
1528 : // indexBlockBufs.
1529 : func (w *Writer) addIndexEntry(
1530 : sep InternalKey,
1531 : bhp BlockHandleWithProperties,
1532 : tmp []byte,
1533 : flushIndexBuf *indexBlockBuf,
1534 : writeTo *indexBlockBuf,
1535 : inflightSize int,
1536 : indexProps []byte,
1537 1 : ) error {
1538 1 : if bhp.Length == 0 {
1539 0 : // A valid blockHandle must be non-zero.
1540 0 : // In particular, it must have a non-zero length.
1541 0 : return nil
1542 0 : }
1543 :
1544 1 : encoded := encodeBlockHandleWithProperties(tmp, bhp)
1545 1 :
1546 1 : if flushIndexBuf != nil {
1547 1 : if cap(w.indexPartitions) == 0 {
1548 1 : w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
1549 1 : }
1550 : // Enable two level indexes if there is more than one index block.
1551 1 : w.twoLevelIndex = true
1552 1 : if err := w.finishIndexBlock(flushIndexBuf, indexProps); err != nil {
1553 0 : return err
1554 0 : }
1555 : }
1556 :
1557 1 : writeTo.add(sep, encoded, inflightSize)
1558 1 : return nil
1559 : }
1560 :
1561 1 : func (w *Writer) addPrevDataBlockToIndexBlockProps() {
1562 1 : for i := range w.blockPropCollectors {
1563 1 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
1564 1 : }
1565 : }
1566 :
1567 : // addIndexEntrySync adds an index entry for the specified key and block handle.
1568 : // Writer.addIndexEntry is only called synchronously once Writer.Close is called.
1569 : // addIndexEntrySync should only be called if we're sure that index entries
1570 : // aren't being written asynchronously.
1571 : //
1572 : // Invariant:
1573 : // 1. addIndexEntrySync must not store references to the prevKey, key InternalKey's,
1574 : // the tmp byte slice. That is, these must be either deep copied or encoded.
1575 : //
1576 : // TODO: Improve coverage of this method. e.g. tests passed without the line
1577 : // `w.twoLevelIndex = true` previously.
1578 : func (w *Writer) addIndexEntrySync(
1579 : prevKey, key InternalKey, bhp BlockHandleWithProperties, tmp []byte,
1580 1 : ) error {
1581 1 : return w.addIndexEntrySep(w.indexEntrySep(prevKey, key, w.dataBlockBuf), bhp, tmp)
1582 1 : }
1583 :
1584 : func (w *Writer) addIndexEntrySep(
1585 : sep InternalKey, bhp BlockHandleWithProperties, tmp []byte,
1586 1 : ) error {
1587 1 : shouldFlush := supportsTwoLevelIndex(
1588 1 : w.tableFormat) && w.indexBlock.shouldFlush(
1589 1 : sep, encodedBHPEstimatedSize, w.indexBlockSize, w.indexBlockSizeThreshold,
1590 1 : )
1591 1 : var flushableIndexBlock *indexBlockBuf
1592 1 : var props []byte
1593 1 : var err error
1594 1 : if shouldFlush {
1595 1 : flushableIndexBlock = w.indexBlock
1596 1 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1597 1 : w.twoLevelIndex = true
1598 1 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1599 1 : // flush the index block.
1600 1 : props, err = w.finishIndexBlockProps()
1601 1 : if err != nil {
1602 0 : return err
1603 0 : }
1604 : }
1605 :
1606 1 : err = w.addIndexEntry(sep, bhp, tmp, flushableIndexBlock, w.indexBlock, 0, props)
1607 1 : if flushableIndexBlock != nil {
1608 1 : flushableIndexBlock.clear()
1609 1 : indexBlockBufPool.Put(flushableIndexBlock)
1610 1 : }
1611 1 : w.addPrevDataBlockToIndexBlockProps()
1612 1 : return err
1613 : }
1614 :
1615 : func shouldFlush(
1616 : key InternalKey,
1617 : valueLen int,
1618 : restartInterval, estimatedBlockSize, numEntries, targetBlockSize, sizeThreshold int,
1619 1 : ) bool {
1620 1 : if numEntries == 0 {
1621 1 : return false
1622 1 : }
1623 :
1624 1 : if estimatedBlockSize >= targetBlockSize {
1625 1 : return true
1626 1 : }
1627 :
1628 : // The block is currently smaller than the target size.
1629 1 : if estimatedBlockSize <= sizeThreshold {
1630 1 : // The block is smaller than the threshold size at which we'll consider
1631 1 : // flushing it.
1632 1 : return false
1633 1 : }
1634 :
1635 1 : newSize := estimatedBlockSize + key.Size() + valueLen
1636 1 : if numEntries%restartInterval == 0 {
1637 1 : newSize += 4
1638 1 : }
1639 1 : newSize += 4 // varint for shared prefix length
1640 1 : newSize += uvarintLen(uint32(key.Size())) // varint for unshared key bytes
1641 1 : newSize += uvarintLen(uint32(valueLen)) // varint for value size
1642 1 : // Flush if the block plus the new entry is larger than the target size.
1643 1 : return newSize > targetBlockSize
1644 : }
1645 :
1646 1 : func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) {
1647 1 : if len(k.UserKey) == 0 {
1648 0 : return a, k
1649 0 : }
1650 1 : a, keyCopy := a.Copy(k.UserKey)
1651 1 : return a, InternalKey{UserKey: keyCopy, Trailer: k.Trailer}
1652 : }
1653 :
1654 : // Invariants: The byte slice returned by finishIndexBlockProps is heap-allocated
1655 : //
1656 : // and has its own lifetime, independent of the Writer and the blockPropsEncoder,
1657 : //
1658 : // and it is safe to:
1659 : // 1. Reuse w.blockPropsEncoder without first encoding the byte slice returned.
1660 : // 2. Store the byte slice in the Writer since it is a copy and not supported by
1661 : // an underlying buffer.
1662 1 : func (w *Writer) finishIndexBlockProps() ([]byte, error) {
1663 1 : w.blockPropsEncoder.resetProps()
1664 1 : for i := range w.blockPropCollectors {
1665 1 : scratch := w.blockPropsEncoder.getScratchForProp()
1666 1 : var err error
1667 1 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
1668 1 : return nil, err
1669 1 : }
1670 1 : if len(scratch) > 0 {
1671 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
1672 1 : }
1673 : }
1674 1 : return w.blockPropsEncoder.props(), nil
1675 : }
1676 :
1677 : // finishIndexBlock finishes the current index block and adds it to the top
1678 : // level index block. This is only used when two level indexes are enabled.
1679 : //
1680 : // Invariants:
1681 : // 1. The props slice passed into finishedIndexBlock must not be a
1682 : // owned by any other struct, since it will be stored in the Writer.indexPartitions
1683 : // slice.
1684 : // 2. None of the buffers owned by indexBuf will be shallow copied and stored elsewhere.
1685 : // That is, it must be safe to reuse indexBuf after finishIndexBlock has been called.
1686 1 : func (w *Writer) finishIndexBlock(indexBuf *indexBlockBuf, props []byte) error {
1687 1 : part := indexBlockAndBlockProperties{
1688 1 : nEntries: indexBuf.block.nEntries, properties: props,
1689 1 : }
1690 1 : w.indexSepAlloc, part.sep = cloneKeyWithBuf(
1691 1 : indexBuf.block.getCurKey(), w.indexSepAlloc,
1692 1 : )
1693 1 : bk := indexBuf.finish()
1694 1 : if len(w.indexBlockAlloc) < len(bk) {
1695 1 : // Allocate enough bytes for approximately 16 index blocks.
1696 1 : w.indexBlockAlloc = make([]byte, len(bk)*16)
1697 1 : }
1698 1 : n := copy(w.indexBlockAlloc, bk)
1699 1 : part.block = w.indexBlockAlloc[:n:n]
1700 1 : w.indexBlockAlloc = w.indexBlockAlloc[n:]
1701 1 : w.indexPartitions = append(w.indexPartitions, part)
1702 1 : return nil
1703 : }
1704 :
1705 1 : func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) {
1706 1 : props, err := w.finishIndexBlockProps()
1707 1 : if err != nil {
1708 0 : return BlockHandle{}, err
1709 0 : }
1710 : // Add the final unfinished index.
1711 1 : if err = w.finishIndexBlock(w.indexBlock, props); err != nil {
1712 0 : return BlockHandle{}, err
1713 0 : }
1714 :
1715 1 : for i := range w.indexPartitions {
1716 1 : b := &w.indexPartitions[i]
1717 1 : w.props.NumDataBlocks += uint64(b.nEntries)
1718 1 :
1719 1 : data := b.block
1720 1 : w.props.IndexSize += uint64(len(data))
1721 1 : bh, err := w.writeBlock(data, w.compression, &w.blockBuf)
1722 1 : if err != nil {
1723 0 : return BlockHandle{}, err
1724 0 : }
1725 1 : bhp := BlockHandleWithProperties{
1726 1 : BlockHandle: bh,
1727 1 : Props: b.properties,
1728 1 : }
1729 1 : encoded := encodeBlockHandleWithProperties(w.blockBuf.tmp[:], bhp)
1730 1 : w.topLevelIndexBlock.add(b.sep, encoded)
1731 : }
1732 :
1733 : // NB: RocksDB includes the block trailer length in the index size
1734 : // property, though it doesn't include the trailer in the top level
1735 : // index size property.
1736 1 : w.props.IndexPartitions = uint64(len(w.indexPartitions))
1737 1 : w.props.TopLevelIndexSize = uint64(w.topLevelIndexBlock.estimatedSize())
1738 1 : w.props.IndexSize += w.props.TopLevelIndexSize + blockTrailerLen
1739 1 :
1740 1 : return w.writeBlock(w.topLevelIndexBlock.finish(), w.compression, &w.blockBuf)
1741 : }
1742 :
1743 1 : func compressAndChecksum(b []byte, compression Compression, blockBuf *blockBuf) []byte {
1744 1 : // Compress the buffer, discarding the result if the improvement isn't at
1745 1 : // least 12.5%.
1746 1 : blockType, compressed := compressBlock(compression, b, blockBuf.compressedBuf)
1747 1 : if blockType != noCompressionBlockType && cap(compressed) > cap(blockBuf.compressedBuf) {
1748 1 : blockBuf.compressedBuf = compressed[:cap(compressed)]
1749 1 : }
1750 1 : if len(compressed) < len(b)-len(b)/8 {
1751 1 : b = compressed
1752 1 : } else {
1753 1 : blockType = noCompressionBlockType
1754 1 : }
1755 :
1756 1 : blockBuf.tmp[0] = byte(blockType)
1757 1 :
1758 1 : // Calculate the checksum.
1759 1 : checksum := blockBuf.checksummer.checksum(b, blockBuf.tmp[:1])
1760 1 : binary.LittleEndian.PutUint32(blockBuf.tmp[1:5], checksum)
1761 1 : return b
1762 : }
1763 :
1764 1 : func (w *Writer) writeCompressedBlock(block []byte, blockTrailerBuf []byte) (BlockHandle, error) {
1765 1 : bh := BlockHandle{Offset: w.meta.Size, Length: uint64(len(block))}
1766 1 :
1767 1 : if w.cacheID != 0 && w.fileNum != 0 {
1768 1 : // Remove the block being written from the cache. This provides defense in
1769 1 : // depth against bugs which cause cache collisions.
1770 1 : //
1771 1 : // TODO(peter): Alternatively, we could add the uncompressed value to the
1772 1 : // cache.
1773 1 : w.cache.Delete(w.cacheID, w.fileNum, bh.Offset)
1774 1 : }
1775 :
1776 : // Write the bytes to the file.
1777 1 : if err := w.writable.Write(block); err != nil {
1778 0 : return BlockHandle{}, err
1779 0 : }
1780 1 : w.meta.Size += uint64(len(block))
1781 1 : if err := w.writable.Write(blockTrailerBuf[:blockTrailerLen]); err != nil {
1782 0 : return BlockHandle{}, err
1783 0 : }
1784 1 : w.meta.Size += blockTrailerLen
1785 1 :
1786 1 : return bh, nil
1787 : }
1788 :
1789 : // Write implements io.Writer. This is analogous to writeCompressedBlock for
1790 : // blocks that already incorporate the trailer, and don't need the callee to
1791 : // return a BlockHandle.
1792 1 : func (w *Writer) Write(blockWithTrailer []byte) (n int, err error) {
1793 1 : offset := w.meta.Size
1794 1 : if w.cacheID != 0 && w.fileNum != 0 {
1795 1 : // Remove the block being written from the cache. This provides defense in
1796 1 : // depth against bugs which cause cache collisions.
1797 1 : //
1798 1 : // TODO(peter): Alternatively, we could add the uncompressed value to the
1799 1 : // cache.
1800 1 : w.cache.Delete(w.cacheID, w.fileNum, offset)
1801 1 : }
1802 1 : w.meta.Size += uint64(len(blockWithTrailer))
1803 1 : if err := w.writable.Write(blockWithTrailer); err != nil {
1804 0 : return 0, err
1805 0 : }
1806 1 : return len(blockWithTrailer), nil
1807 : }
1808 :
1809 : func (w *Writer) writeBlock(
1810 : b []byte, compression Compression, blockBuf *blockBuf,
1811 1 : ) (BlockHandle, error) {
1812 1 : b = compressAndChecksum(b, compression, blockBuf)
1813 1 : return w.writeCompressedBlock(b, blockBuf.tmp[:])
1814 1 : }
1815 :
1816 : // assertFormatCompatibility ensures that the features present on the table are
1817 : // compatible with the table format version.
1818 1 : func (w *Writer) assertFormatCompatibility() error {
1819 1 : // PebbleDBv1: block properties.
1820 1 : if len(w.blockPropCollectors) > 0 && w.tableFormat < TableFormatPebblev1 {
1821 1 : return errors.Newf(
1822 1 : "table format version %s is less than the minimum required version %s for block properties",
1823 1 : w.tableFormat, TableFormatPebblev1,
1824 1 : )
1825 1 : }
1826 :
1827 : // PebbleDBv2: range keys.
1828 1 : if w.props.NumRangeKeys() > 0 && w.tableFormat < TableFormatPebblev2 {
1829 1 : return errors.Newf(
1830 1 : "table format version %s is less than the minimum required version %s for range keys",
1831 1 : w.tableFormat, TableFormatPebblev2,
1832 1 : )
1833 1 : }
1834 :
1835 : // PebbleDBv3: value blocks.
1836 1 : if (w.props.NumValueBlocks > 0 || w.props.NumValuesInValueBlocks > 0 ||
1837 1 : w.props.ValueBlocksSize > 0) && w.tableFormat < TableFormatPebblev3 {
1838 0 : return errors.Newf(
1839 0 : "table format version %s is less than the minimum required version %s for value blocks",
1840 0 : w.tableFormat, TableFormatPebblev3)
1841 0 : }
1842 :
1843 : // PebbleDBv4: DELSIZED tombstones.
1844 1 : if w.props.NumSizedDeletions > 0 && w.tableFormat < TableFormatPebblev4 {
1845 0 : return errors.Newf(
1846 0 : "table format version %s is less than the minimum required version %s for sized deletion tombstones",
1847 0 : w.tableFormat, TableFormatPebblev4)
1848 0 : }
1849 1 : return nil
1850 : }
1851 :
1852 : // Close finishes writing the table and closes the underlying file that the
1853 : // table was written to.
1854 1 : func (w *Writer) Close() (err error) {
1855 1 : defer func() {
1856 1 : if w.valueBlockWriter != nil {
1857 1 : releaseValueBlockWriter(w.valueBlockWriter)
1858 1 : // Defensive code in case Close gets called again. We don't want to put
1859 1 : // the same object to a sync.Pool.
1860 1 : w.valueBlockWriter = nil
1861 1 : }
1862 1 : if w.writable != nil {
1863 1 : w.writable.Abort()
1864 1 : w.writable = nil
1865 1 : }
1866 : // Record any error in the writer (so we can exit early if Close is called
1867 : // again).
1868 1 : if err != nil {
1869 1 : w.err = err
1870 1 : }
1871 : }()
1872 :
1873 : // finish must be called before we check for an error, because finish will
1874 : // block until every single task added to the writeQueue has been processed,
1875 : // and an error could be encountered while any of those tasks are processed.
1876 1 : if err := w.coordination.writeQueue.finish(); err != nil {
1877 1 : return err
1878 1 : }
1879 :
1880 1 : if w.err != nil {
1881 1 : return w.err
1882 1 : }
1883 :
1884 : // The w.meta.LargestPointKey is only used once the Writer is closed, so it is safe to set it
1885 : // when the Writer is closed.
1886 : //
1887 : // The following invariants ensure that setting the largest key at this point of a Writer close
1888 : // is correct:
1889 : // 1. Keys must only be added to the Writer in an increasing order.
1890 : // 2. The current w.dataBlockBuf is guaranteed to have the latest key added to the Writer. This
1891 : // must be true, because a w.dataBlockBuf is only switched out when a dataBlock is flushed,
1892 : // however, if a dataBlock is flushed, then we add a key to the new w.dataBlockBuf in the
1893 : // addPoint function after the flush occurs.
1894 1 : if w.dataBlockBuf.dataBlock.nEntries >= 1 {
1895 1 : w.meta.SetLargestPointKey(w.dataBlockBuf.dataBlock.getCurKey().Clone())
1896 1 : }
1897 :
1898 : // Finish the last data block, or force an empty data block if there
1899 : // aren't any data blocks at all.
1900 1 : if w.dataBlockBuf.dataBlock.nEntries > 0 || w.indexBlock.block.nEntries == 0 {
1901 1 : bh, err := w.writeBlock(w.dataBlockBuf.dataBlock.finish(), w.compression, &w.dataBlockBuf.blockBuf)
1902 1 : if err != nil {
1903 0 : return err
1904 0 : }
1905 1 : bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
1906 1 : if err != nil {
1907 0 : return err
1908 0 : }
1909 1 : prevKey := w.dataBlockBuf.dataBlock.getCurKey()
1910 1 : if err := w.addIndexEntrySync(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil {
1911 0 : return err
1912 0 : }
1913 : }
1914 1 : w.props.DataSize = w.meta.Size
1915 1 :
1916 1 : // Write the filter block.
1917 1 : var metaindex rawBlockWriter
1918 1 : metaindex.restartInterval = 1
1919 1 : if w.filter != nil {
1920 1 : b, err := w.filter.finish()
1921 1 : if err != nil {
1922 0 : return err
1923 0 : }
1924 1 : bh, err := w.writeBlock(b, NoCompression, &w.blockBuf)
1925 1 : if err != nil {
1926 0 : return err
1927 0 : }
1928 1 : n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
1929 1 : metaindex.add(InternalKey{UserKey: []byte(w.filter.metaName())}, w.blockBuf.tmp[:n])
1930 1 : w.props.FilterPolicyName = w.filter.policyName()
1931 1 : w.props.FilterSize = bh.Length
1932 : }
1933 :
1934 1 : var indexBH BlockHandle
1935 1 : if w.twoLevelIndex {
1936 1 : w.props.IndexType = twoLevelIndex
1937 1 : // Write the two level index block.
1938 1 : indexBH, err = w.writeTwoLevelIndex()
1939 1 : if err != nil {
1940 0 : return err
1941 0 : }
1942 1 : } else {
1943 1 : w.props.IndexType = binarySearchIndex
1944 1 : // NB: RocksDB includes the block trailer length in the index size
1945 1 : // property, though it doesn't include the trailer in the filter size
1946 1 : // property.
1947 1 : w.props.IndexSize = uint64(w.indexBlock.estimatedSize()) + blockTrailerLen
1948 1 : w.props.NumDataBlocks = uint64(w.indexBlock.block.nEntries)
1949 1 :
1950 1 : // Write the single level index block.
1951 1 : indexBH, err = w.writeBlock(w.indexBlock.finish(), w.compression, &w.blockBuf)
1952 1 : if err != nil {
1953 0 : return err
1954 0 : }
1955 : }
1956 :
1957 : // Write the range-del block. The block handle must added to the meta index block
1958 : // after the properties block has been written. This is because the entries in the
1959 : // metaindex block must be sorted by key.
1960 1 : var rangeDelBH BlockHandle
1961 1 : if w.props.NumRangeDeletions > 0 {
1962 1 : if !w.rangeDelV1Format {
1963 1 : // Because the range tombstones are fragmented in the v2 format, the end
1964 1 : // key of the last added range tombstone will be the largest range
1965 1 : // tombstone key. Note that we need to make this into a range deletion
1966 1 : // sentinel because sstable boundaries are inclusive while the end key of
1967 1 : // a range deletion tombstone is exclusive. A Clone() is necessary as
1968 1 : // rangeDelBlock.curValue is the same slice that will get passed
1969 1 : // into w.writer, and some implementations of vfs.File mutate the
1970 1 : // slice passed into Write(). Also, w.meta will often outlive the
1971 1 : // blockWriter, and so cloning curValue allows the rangeDelBlock's
1972 1 : // internal buffer to get gc'd.
1973 1 : k := base.MakeRangeDeleteSentinelKey(w.rangeDelBlock.curValue).Clone()
1974 1 : w.meta.SetLargestRangeDelKey(k)
1975 1 : }
1976 1 : rangeDelBH, err = w.writeBlock(w.rangeDelBlock.finish(), NoCompression, &w.blockBuf)
1977 1 : if err != nil {
1978 0 : return err
1979 0 : }
1980 : }
1981 :
1982 : // Write the range-key block, flushing any remaining spans from the
1983 : // fragmenter first.
1984 1 : w.fragmenter.Finish()
1985 1 :
1986 1 : var rangeKeyBH BlockHandle
1987 1 : if w.props.NumRangeKeys() > 0 {
1988 1 : key := w.rangeKeyBlock.getCurKey()
1989 1 : kind := key.Kind()
1990 1 : endKey, _, ok := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.curValue)
1991 1 : if !ok {
1992 0 : return errors.Newf("invalid end key: %s", w.rangeKeyBlock.curValue)
1993 0 : }
1994 1 : k := base.MakeExclusiveSentinelKey(kind, endKey).Clone()
1995 1 : w.meta.SetLargestRangeKey(k)
1996 1 : // TODO(travers): The lack of compression on the range key block matches the
1997 1 : // lack of compression on the range-del block. Revisit whether we want to
1998 1 : // enable compression on this block.
1999 1 : rangeKeyBH, err = w.writeBlock(w.rangeKeyBlock.finish(), NoCompression, &w.blockBuf)
2000 1 : if err != nil {
2001 0 : return err
2002 0 : }
2003 : }
2004 :
2005 1 : if w.valueBlockWriter != nil {
2006 1 : vbiHandle, vbStats, err := w.valueBlockWriter.finish(w, w.meta.Size)
2007 1 : if err != nil {
2008 0 : return err
2009 0 : }
2010 1 : w.props.NumValueBlocks = vbStats.numValueBlocks
2011 1 : w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
2012 1 : w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
2013 1 : if vbStats.numValueBlocks > 0 {
2014 1 : n := encodeValueBlocksIndexHandle(w.blockBuf.tmp[:], vbiHandle)
2015 1 : metaindex.add(InternalKey{UserKey: []byte(metaValueIndexName)}, w.blockBuf.tmp[:n])
2016 1 : }
2017 : }
2018 :
2019 : // Add the range key block handle to the metaindex block. Note that we add the
2020 : // block handle to the metaindex block before the other meta blocks as the
2021 : // metaindex block entries must be sorted, and the range key block name sorts
2022 : // before the other block names.
2023 1 : if w.props.NumRangeKeys() > 0 {
2024 1 : n := encodeBlockHandle(w.blockBuf.tmp[:], rangeKeyBH)
2025 1 : metaindex.add(InternalKey{UserKey: []byte(metaRangeKeyName)}, w.blockBuf.tmp[:n])
2026 1 : }
2027 :
2028 1 : {
2029 1 : // Finish and record the prop collectors if props are not yet recorded.
2030 1 : // Pre-computed props might have been copied by specialized sst creators
2031 1 : // like suffix replacer or a span copier.
2032 1 : if len(w.props.UserProperties) == 0 {
2033 1 : userProps := make(map[string]string)
2034 1 : for i := range w.blockPropCollectors {
2035 1 : scratch := w.blockPropsEncoder.getScratchForProp()
2036 1 : // Place the shortID in the first byte.
2037 1 : scratch = append(scratch, byte(i))
2038 1 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
2039 1 : if err != nil {
2040 1 : return err
2041 1 : }
2042 1 : var prop string
2043 1 : if len(buf) > 0 {
2044 1 : prop = string(buf)
2045 1 : }
2046 : // NB: The property is populated in the map even if it is the
2047 : // empty string, since the presence in the map is what indicates
2048 : // that the block property collector was used when writing.
2049 1 : userProps[w.blockPropCollectors[i].Name()] = prop
2050 : }
2051 1 : if len(userProps) > 0 {
2052 1 : w.props.UserProperties = userProps
2053 1 : }
2054 : }
2055 :
2056 : // Write the properties block.
2057 1 : var raw rawBlockWriter
2058 1 : // The restart interval is set to infinity because the properties block
2059 1 : // is always read sequentially and cached in a heap located object. This
2060 1 : // reduces table size without a significant impact on performance.
2061 1 : raw.restartInterval = propertiesBlockRestartInterval
2062 1 : w.props.CompressionOptions = rocksDBCompressionOptions
2063 1 : w.props.save(w.tableFormat, &raw)
2064 1 : bh, err := w.writeBlock(raw.finish(), NoCompression, &w.blockBuf)
2065 1 : if err != nil {
2066 0 : return err
2067 0 : }
2068 1 : n := encodeBlockHandle(w.blockBuf.tmp[:], bh)
2069 1 : metaindex.add(InternalKey{UserKey: []byte(metaPropertiesName)}, w.blockBuf.tmp[:n])
2070 : }
2071 :
2072 : // Add the range deletion block handle to the metaindex block.
2073 1 : if w.props.NumRangeDeletions > 0 {
2074 1 : n := encodeBlockHandle(w.blockBuf.tmp[:], rangeDelBH)
2075 1 : // The v2 range-del block encoding is backwards compatible with the v1
2076 1 : // encoding. We add meta-index entries for both the old name and the new
2077 1 : // name so that old code can continue to find the range-del block and new
2078 1 : // code knows that the range tombstones in the block are fragmented and
2079 1 : // sorted.
2080 1 : metaindex.add(InternalKey{UserKey: []byte(metaRangeDelName)}, w.blockBuf.tmp[:n])
2081 1 : if !w.rangeDelV1Format {
2082 1 : metaindex.add(InternalKey{UserKey: []byte(metaRangeDelV2Name)}, w.blockBuf.tmp[:n])
2083 1 : }
2084 : }
2085 :
2086 : // Write the metaindex block. It might be an empty block, if the filter
2087 : // policy is nil. NoCompression is specified because a) RocksDB never
2088 : // compresses the meta-index block and b) RocksDB has some code paths which
2089 : // expect the meta-index block to not be compressed.
2090 1 : metaindexBH, err := w.writeBlock(metaindex.blockWriter.finish(), NoCompression, &w.blockBuf)
2091 1 : if err != nil {
2092 0 : return err
2093 0 : }
2094 :
2095 : // Write the table footer.
2096 1 : footer := footer{
2097 1 : format: w.tableFormat,
2098 1 : checksum: w.blockBuf.checksummer.checksumType,
2099 1 : metaindexBH: metaindexBH,
2100 1 : indexBH: indexBH,
2101 1 : }
2102 1 : encoded := footer.encode(w.blockBuf.tmp[:])
2103 1 : if err := w.writable.Write(footer.encode(w.blockBuf.tmp[:])); err != nil {
2104 0 : return err
2105 0 : }
2106 1 : w.meta.Size += uint64(len(encoded))
2107 1 : w.meta.Properties = w.props
2108 1 :
2109 1 : // Check that the features present in the table are compatible with the format
2110 1 : // configured for the table.
2111 1 : if err = w.assertFormatCompatibility(); err != nil {
2112 1 : return err
2113 1 : }
2114 :
2115 1 : if err := w.writable.Finish(); err != nil {
2116 1 : w.writable = nil
2117 1 : return err
2118 1 : }
2119 1 : w.writable = nil
2120 1 :
2121 1 : w.dataBlockBuf.clear()
2122 1 : dataBlockBufPool.Put(w.dataBlockBuf)
2123 1 : w.dataBlockBuf = nil
2124 1 : w.indexBlock.clear()
2125 1 : indexBlockBufPool.Put(w.indexBlock)
2126 1 : w.indexBlock = nil
2127 1 :
2128 1 : // Make any future calls to Set or Close return an error.
2129 1 : w.err = errWriterClosed
2130 1 : return nil
2131 : }
2132 :
2133 : // EstimatedSize returns the estimated size of the sstable being written if a
2134 : // call to Finish() was made without adding additional keys.
2135 1 : func (w *Writer) EstimatedSize() uint64 {
2136 1 : return w.coordination.sizeEstimate.size() +
2137 1 : uint64(w.dataBlockBuf.dataBlock.estimatedSize()) +
2138 1 : w.indexBlock.estimatedSize()
2139 1 : }
2140 :
2141 : // Metadata returns the metadata for the finished sstable. Only valid to call
2142 : // after the sstable has been finished.
2143 1 : func (w *Writer) Metadata() (*WriterMetadata, error) {
2144 1 : if w.writable != nil {
2145 0 : return nil, errors.New("pebble: writer is not closed")
2146 0 : }
2147 1 : return &w.meta, nil
2148 : }
2149 :
2150 : // WriterOption provide an interface to do work on Writer while it is being
2151 : // opened.
2152 : type WriterOption interface {
2153 : // writerApply is called on the writer during opening in order to set
2154 : // internal parameters.
2155 : writerApply(*Writer)
2156 : }
2157 :
2158 : // PreviousPointKeyOpt is a WriterOption that provides access to the last
2159 : // point key written to the writer while building a sstable.
2160 : type PreviousPointKeyOpt struct {
2161 : w *Writer
2162 : }
2163 :
2164 : // UnsafeKey returns the last point key written to the writer to which this
2165 : // option was passed during creation. The returned key points directly into
2166 : // a buffer belonging to the Writer. The value's lifetime ends the next time a
2167 : // point key is added to the Writer.
2168 : // Invariant: UnsafeKey isn't and shouldn't be called after the Writer is closed.
2169 1 : func (o PreviousPointKeyOpt) UnsafeKey() base.InternalKey {
2170 1 : if o.w == nil {
2171 0 : return base.InvalidInternalKey
2172 0 : }
2173 :
2174 1 : if o.w.dataBlockBuf.dataBlock.nEntries >= 1 {
2175 1 : // o.w.dataBlockBuf.dataBlock.curKey is guaranteed to point to the last point key
2176 1 : // which was added to the Writer.
2177 1 : return o.w.dataBlockBuf.dataBlock.getCurKey()
2178 1 : }
2179 0 : return base.InternalKey{}
2180 : }
2181 :
2182 1 : func (o *PreviousPointKeyOpt) writerApply(w *Writer) {
2183 1 : o.w = w
2184 1 : }
2185 :
2186 : // NewWriter returns a new table writer for the file. Closing the writer will
2187 : // close the file.
2188 1 : func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...WriterOption) *Writer {
2189 1 : o = o.ensureDefaults()
2190 1 : w := &Writer{
2191 1 : writable: writable,
2192 1 : meta: WriterMetadata{
2193 1 : SmallestSeqNum: math.MaxUint64,
2194 1 : },
2195 1 : blockSize: o.BlockSize,
2196 1 : blockSizeThreshold: (o.BlockSize*o.BlockSizeThreshold + 99) / 100,
2197 1 : indexBlockSize: o.IndexBlockSize,
2198 1 : indexBlockSizeThreshold: (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100,
2199 1 : compare: o.Comparer.Compare,
2200 1 : split: o.Comparer.Split,
2201 1 : formatKey: o.Comparer.FormatKey,
2202 1 : compression: o.Compression,
2203 1 : separator: o.Comparer.Separator,
2204 1 : successor: o.Comparer.Successor,
2205 1 : tableFormat: o.TableFormat,
2206 1 : isStrictObsolete: o.IsStrictObsolete,
2207 1 : writingToLowestLevel: o.WritingToLowestLevel,
2208 1 : cache: o.Cache,
2209 1 : restartInterval: o.BlockRestartInterval,
2210 1 : checksumType: o.Checksum,
2211 1 : indexBlock: newIndexBlockBuf(o.Parallelism),
2212 1 : rangeDelBlock: blockWriter{
2213 1 : restartInterval: 1,
2214 1 : },
2215 1 : rangeKeyBlock: blockWriter{
2216 1 : restartInterval: 1,
2217 1 : },
2218 1 : topLevelIndexBlock: blockWriter{
2219 1 : restartInterval: 1,
2220 1 : },
2221 1 : fragmenter: keyspan.Fragmenter{
2222 1 : Cmp: o.Comparer.Compare,
2223 1 : Format: o.Comparer.FormatKey,
2224 1 : },
2225 1 : }
2226 1 : if w.tableFormat >= TableFormatPebblev3 {
2227 1 : w.shortAttributeExtractor = o.ShortAttributeExtractor
2228 1 : w.requiredInPlaceValueBound = o.RequiredInPlaceValueBound
2229 1 : if !o.DisableValueBlocks {
2230 1 : w.valueBlockWriter = newValueBlockWriter(
2231 1 : w.blockSize, w.blockSizeThreshold, w.compression, w.checksumType, func(compressedSize int) {
2232 1 : w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0)
2233 1 : })
2234 : }
2235 : }
2236 :
2237 1 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
2238 1 :
2239 1 : w.blockBuf = blockBuf{
2240 1 : checksummer: checksummer{checksumType: o.Checksum},
2241 1 : }
2242 1 :
2243 1 : w.coordination.init(o.Parallelism, w)
2244 1 :
2245 1 : if writable == nil {
2246 0 : w.err = errors.New("pebble: nil writable")
2247 0 : return w
2248 0 : }
2249 :
2250 : // Note that WriterOptions are applied in two places; the ones with a
2251 : // preApply() method are applied here. The rest are applied down below after
2252 : // default properties are set.
2253 1 : type preApply interface{ preApply() }
2254 1 : for _, opt := range extraOpts {
2255 1 : if _, ok := opt.(preApply); ok {
2256 1 : opt.writerApply(w)
2257 1 : }
2258 : }
2259 :
2260 1 : if o.FilterPolicy != nil {
2261 1 : switch o.FilterType {
2262 1 : case TableFilter:
2263 1 : w.filter = newTableFilterWriter(o.FilterPolicy)
2264 0 : default:
2265 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
2266 : }
2267 : }
2268 :
2269 1 : w.props.ComparerName = o.Comparer.Name
2270 1 : w.props.CompressionName = o.Compression.String()
2271 1 : w.props.MergerName = o.MergerName
2272 1 : w.props.PropertyCollectorNames = "[]"
2273 1 : w.props.ExternalFormatVersion = rocksDBExternalFormatVersion
2274 1 :
2275 1 : if len(o.BlockPropertyCollectors) > 0 || w.tableFormat >= TableFormatPebblev4 {
2276 1 : var buf bytes.Buffer
2277 1 : buf.WriteString("[")
2278 1 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
2279 1 : if w.tableFormat >= TableFormatPebblev4 {
2280 1 : numBlockPropertyCollectors++
2281 1 : }
2282 : // shortID is a uint8, so we cannot exceed that number of block
2283 : // property collectors.
2284 1 : if numBlockPropertyCollectors > math.MaxUint8 {
2285 0 : w.err = errors.New("pebble: too many block property collectors")
2286 0 : return w
2287 0 : }
2288 1 : if numBlockPropertyCollectors > 0 {
2289 1 : w.blockPropCollectors = make([]BlockPropertyCollector, numBlockPropertyCollectors)
2290 1 : }
2291 1 : if len(o.BlockPropertyCollectors) > 0 {
2292 1 : // The shortID assigned to a collector is the same as its index in
2293 1 : // this slice.
2294 1 : for i := range o.BlockPropertyCollectors {
2295 1 : w.blockPropCollectors[i] = o.BlockPropertyCollectors[i]()
2296 1 : if i > 0 {
2297 1 : buf.WriteString(",")
2298 1 : }
2299 1 : buf.WriteString(w.blockPropCollectors[i].Name())
2300 : }
2301 : }
2302 1 : if w.tableFormat >= TableFormatPebblev4 {
2303 1 : if numBlockPropertyCollectors > 1 {
2304 1 : buf.WriteString(",")
2305 1 : }
2306 1 : w.blockPropCollectors[numBlockPropertyCollectors-1] = &w.obsoleteCollector
2307 1 : buf.WriteString(w.obsoleteCollector.Name())
2308 : }
2309 1 : buf.WriteString("]")
2310 1 : w.props.PropertyCollectorNames = buf.String()
2311 : }
2312 :
2313 : // Apply the remaining WriterOptions that do not have a preApply() method.
2314 1 : for _, opt := range extraOpts {
2315 1 : if _, ok := opt.(preApply); ok {
2316 1 : continue
2317 : }
2318 1 : opt.writerApply(w)
2319 : }
2320 :
2321 : // Initialize the range key fragmenter and encoder.
2322 1 : w.fragmenter.Emit = w.encodeRangeKeySpan
2323 1 : w.rangeKeyEncoder.Emit = w.addRangeKey
2324 1 : return w
2325 : }
2326 :
2327 : // internalGetProperties is a private, internal-use-only function that takes a
2328 : // Writer and returns a pointer to its Properties, allowing direct mutation.
2329 : // It's used by internal Pebble flushes and compactions to set internal
2330 : // properties. It gets installed in private.
2331 1 : func internalGetProperties(w *Writer) *Properties {
2332 1 : return &w.props
2333 1 : }
2334 :
2335 1 : func init() {
2336 1 : private.SSTableWriterDisableKeyOrderChecks = func(i interface{}) {
2337 1 : w := i.(*Writer)
2338 1 : w.disableKeyOrderChecks = true
2339 1 : }
2340 1 : private.SSTableInternalProperties = internalGetProperties
2341 : }
|