Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "math"
12 : "runtime"
13 : "slices"
14 : "sort"
15 : "sync"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/bytealloc"
20 : "github.com/cockroachdb/pebble/internal/cache"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/rangedel"
24 : "github.com/cockroachdb/pebble/internal/rangekey"
25 : "github.com/cockroachdb/pebble/objstorage"
26 : "github.com/cockroachdb/pebble/sstable/block"
27 : "github.com/cockroachdb/pebble/sstable/rowblk"
28 : )
29 :
30 : // encodedBHPEstimatedSize estimates the size of the encoded BlockHandleWithProperties.
31 : // It would also be nice to account for the length of the data block properties here,
32 : // but isn't necessary since this is an estimate.
33 : const encodedBHPEstimatedSize = binary.MaxVarintLen64 * 2
34 :
35 : var errWriterClosed = errors.New("pebble: writer is closed")
36 :
37 : // WriterMetadata holds info about a finished sstable.
38 : type WriterMetadata struct {
39 : Size uint64
40 : SmallestPoint InternalKey
41 : // LargestPoint, LargestRangeKey, LargestRangeDel should not be accessed
42 : // before Writer.Close is called, because they may only be set on
43 : // Writer.Close.
44 : LargestPoint InternalKey
45 : SmallestRangeDel InternalKey
46 : LargestRangeDel InternalKey
47 : SmallestRangeKey InternalKey
48 : LargestRangeKey InternalKey
49 : HasPointKeys bool
50 : HasRangeDelKeys bool
51 : HasRangeKeys bool
52 : SmallestSeqNum base.SeqNum
53 : LargestSeqNum base.SeqNum
54 : Properties Properties
55 : }
56 :
57 : // SetSmallestPointKey sets the smallest point key to the given key.
58 : // NB: this method set the "absolute" smallest point key. Any existing key is
59 : // overridden.
60 1 : func (m *WriterMetadata) SetSmallestPointKey(k InternalKey) {
61 1 : m.SmallestPoint = k
62 1 : m.HasPointKeys = true
63 1 : }
64 :
65 : // SetSmallestRangeDelKey sets the smallest rangedel key to the given key.
66 : // NB: this method set the "absolute" smallest rangedel key. Any existing key is
67 : // overridden.
68 1 : func (m *WriterMetadata) SetSmallestRangeDelKey(k InternalKey) {
69 1 : m.SmallestRangeDel = k
70 1 : m.HasRangeDelKeys = true
71 1 : }
72 :
73 : // SetSmallestRangeKey sets the smallest range key to the given key.
74 : // NB: this method set the "absolute" smallest range key. Any existing key is
75 : // overridden.
76 1 : func (m *WriterMetadata) SetSmallestRangeKey(k InternalKey) {
77 1 : m.SmallestRangeKey = k
78 1 : m.HasRangeKeys = true
79 1 : }
80 :
81 : // SetLargestPointKey sets the largest point key to the given key.
82 : // NB: this method set the "absolute" largest point key. Any existing key is
83 : // overridden.
84 1 : func (m *WriterMetadata) SetLargestPointKey(k InternalKey) {
85 1 : m.LargestPoint = k
86 1 : m.HasPointKeys = true
87 1 : }
88 :
89 : // SetLargestRangeDelKey sets the largest rangedel key to the given key.
90 : // NB: this method set the "absolute" largest rangedel key. Any existing key is
91 : // overridden.
92 1 : func (m *WriterMetadata) SetLargestRangeDelKey(k InternalKey) {
93 1 : m.LargestRangeDel = k
94 1 : m.HasRangeDelKeys = true
95 1 : }
96 :
97 : // SetLargestRangeKey sets the largest range key to the given key.
98 : // NB: this method set the "absolute" largest range key. Any existing key is
99 : // overridden.
100 1 : func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) {
101 1 : m.LargestRangeKey = k
102 1 : m.HasRangeKeys = true
103 1 : }
104 :
105 1 : func (m *WriterMetadata) updateSeqNum(seqNum base.SeqNum) {
106 1 : if m.SmallestSeqNum > seqNum {
107 1 : m.SmallestSeqNum = seqNum
108 1 : }
109 1 : if m.LargestSeqNum < seqNum {
110 1 : m.LargestSeqNum = seqNum
111 1 : }
112 : }
113 :
114 : // flushDecisionOptions holds parameters to inform the sstable block flushing
115 : // heuristics.
116 : type flushDecisionOptions struct {
117 : blockSize int
118 : blockSizeThreshold int
119 : // sizeClassAwareThreshold takes precedence over blockSizeThreshold when the
120 : // Writer is aware of the allocator's size classes.
121 : sizeClassAwareThreshold int
122 : }
123 :
124 : // Writer is a table writer.
125 : type Writer struct {
126 : layout layoutWriter
127 : meta WriterMetadata
128 : err error
129 : // dataBlockOptions and indexBlockOptions are used to configure the sstable
130 : // block flush heuristics.
131 : dataBlockOptions flushDecisionOptions
132 : indexBlockOptions flushDecisionOptions
133 : // The following fields are copied from Options.
134 : compare Compare
135 : split Split
136 : formatKey base.FormatKey
137 : compression Compression
138 : separator Separator
139 : successor Successor
140 : tableFormat TableFormat
141 : isStrictObsolete bool
142 : writingToLowestLevel bool
143 : restartInterval int
144 : checksumType block.ChecksumType
145 : // disableKeyOrderChecks disables the checks that keys are added to an
146 : // sstable in order. It is intended for internal use only in the construction
147 : // of invalid sstables for testing. See tool/make_test_sstables.go.
148 : disableKeyOrderChecks bool
149 : // With two level indexes, the index/filter of a SST file is partitioned into
150 : // smaller blocks with an additional top-level index on them. When reading an
151 : // index/filter, only the top-level index is loaded into memory. The two level
152 : // index/filter then uses the top-level index to load on demand into the block
153 : // cache the partitions that are required to perform the index/filter query.
154 : //
155 : // Two level indexes are enabled automatically when there is more than one
156 : // index block.
157 : //
158 : // This is useful when there are very large index blocks, which generally occurs
159 : // with the usage of large keys. With large index blocks, the index blocks fight
160 : // the data blocks for block cache space and the index blocks are likely to be
161 : // re-read many times from the disk. The top level index, which has a much
162 : // smaller memory footprint, can be used to prevent the entire index block from
163 : // being loaded into the block cache.
164 : twoLevelIndex bool
165 : indexBlock *indexBlockBuf
166 : rangeDelBlock rowblk.Writer
167 : rangeKeyBlock rowblk.Writer
168 : topLevelIndexBlock rowblk.Writer
169 : props Properties
170 : blockPropCollectors []BlockPropertyCollector
171 : obsoleteCollector obsoleteKeyBlockPropertyCollector
172 : blockPropsEncoder blockPropertiesEncoder
173 : // filter accumulates the filter block. If populated, the filter ingests
174 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
175 : // nil, or the full keys otherwise.
176 : filter filterWriter
177 : indexPartitions []indexBlockAndBlockProperties
178 :
179 : // indexBlockAlloc is used to bulk-allocate byte slices used to store index
180 : // blocks in indexPartitions. These live until the index finishes.
181 : indexBlockAlloc []byte
182 : // indexSepAlloc is used to bulk-allocate index block separator slices stored
183 : // in indexPartitions. These live until the index finishes.
184 : indexSepAlloc bytealloc.A
185 :
186 : // To allow potentially overlapping (i.e. un-fragmented) range keys spans to
187 : // be added to the Writer, a keyspan.Fragmenter is used to retain the keys
188 : // and values, emitting fragmented, coalesced spans as appropriate. Range
189 : // keys must be added in order of their start user-key.
190 : fragmenter keyspan.Fragmenter
191 : rangeKeyEncoder rangekey.Encoder
192 : rangeKeysBySuffix keyspan.KeysBySuffix
193 : rangeKeySpan keyspan.Span
194 : rkBuf []byte
195 : // dataBlockBuf consists of the state which is currently owned by and used by
196 : // the Writer client goroutine. This state can be handed off to other goroutines.
197 : dataBlockBuf *dataBlockBuf
198 : // blockBuf consists of the state which is owned by and used by the Writer client
199 : // goroutine.
200 : blockBuf blockBuf
201 :
202 : coordination coordinationState
203 :
204 : // Information (other than the byte slice) about the last point key, to
205 : // avoid extracting it again.
206 : lastPointKeyInfo pointKeyInfo
207 :
208 : // For value blocks.
209 : shortAttributeExtractor base.ShortAttributeExtractor
210 : requiredInPlaceValueBound UserKeyPrefixBound
211 : // When w.tableFormat >= TableFormatPebblev3, valueBlockWriter is nil iff
212 : // WriterOptions.DisableValueBlocks was true.
213 : valueBlockWriter *valueBlockWriter
214 :
215 : allocatorSizeClasses []int
216 : }
217 :
218 : type pointKeyInfo struct {
219 : trailer base.InternalKeyTrailer
220 : // Only computed when w.valueBlockWriter is not nil.
221 : userKeyLen int
222 : // prefixLen uses w.split, if not nil. Only computed when w.valueBlockWriter
223 : // is not nil.
224 : prefixLen int
225 : // True iff the point was marked obsolete.
226 : isObsolete bool
227 : }
228 :
229 : type coordinationState struct {
230 : parallelismEnabled bool
231 :
232 : // writeQueue is used to write data blocks to disk. The writeQueue is primarily
233 : // used to maintain the order in which data blocks must be written to disk. For
234 : // this reason, every single data block write must be done through the writeQueue.
235 : writeQueue *writeQueue
236 :
237 : sizeEstimate dataBlockEstimates
238 : }
239 :
240 1 : func (c *coordinationState) init(parallelismEnabled bool, writer *Writer) {
241 1 : c.parallelismEnabled = parallelismEnabled
242 1 : // useMutex is false regardless of parallelismEnabled, because we do not do
243 1 : // parallel compression yet.
244 1 : c.sizeEstimate.useMutex = false
245 1 :
246 1 : // writeQueueSize determines the size of the write queue, or the number
247 1 : // of items which can be added to the queue without blocking. By default, we
248 1 : // use a writeQueue size of 0, since we won't be doing any block writes in
249 1 : // parallel.
250 1 : writeQueueSize := 0
251 1 : if parallelismEnabled {
252 1 : writeQueueSize = runtime.GOMAXPROCS(0)
253 1 : }
254 1 : c.writeQueue = newWriteQueue(writeQueueSize, writer)
255 : }
256 :
257 : // sizeEstimate is a general purpose helper for estimating two kinds of sizes:
258 : // A. The compressed sstable size, which is useful for deciding when to start
259 : //
260 : // a new sstable during flushes or compactions. In practice, we use this in
261 : // estimating the data size (excluding the index).
262 : //
263 : // B. The size of index blocks to decide when to start a new index block.
264 : //
265 : // There are some terminology peculiarities which are due to the origin of
266 : // sizeEstimate for use case A with parallel compression enabled (for which
267 : // the code has not been merged). Specifically this relates to the terms
268 : // "written" and "compressed".
269 : // - The notion of "written" for case A is sufficiently defined by saying that
270 : // the data block is compressed. Waiting for the actual data block write to
271 : // happen can result in unnecessary estimation, when we already know how big
272 : // it will be in compressed form. Additionally, with the forthcoming value
273 : // blocks containing older MVCC values, these compressed block will be held
274 : // in-memory until late in the sstable writing, and we do want to accurately
275 : // account for them without waiting for the actual write.
276 : // For case B, "written" means that the index entry has been fully
277 : // generated, and has been added to the uncompressed block buffer for that
278 : // index block. It does not include actually writing a potentially
279 : // compressed index block.
280 : // - The notion of "compressed" is to differentiate between a "inflight" size
281 : // and the actual size, and is handled via computing a compression ratio
282 : // observed so far (defaults to 1).
283 : // For case A, this is actual data block compression, so the "inflight" size
284 : // is uncompressed blocks (that are no longer being written to) and the
285 : // "compressed" size is after they have been compressed.
286 : // For case B the inflight size is for a key-value pair in the index for
287 : // which the value size (the encoded size of the BlockHandleWithProperties)
288 : // is not accurately known, while the compressed size is the size of that
289 : // entry when it has been added to the (in-progress) index ssblock.
290 : //
291 : // Usage: To update state, one can optionally provide an inflight write value
292 : // using addInflight (used for case B). When something is "written" the state
293 : // can be updated using either writtenWithDelta or writtenWithTotal, which
294 : // provide the actual delta size or the total size (latter must be
295 : // monotonically non-decreasing). If there were no calls to addInflight, there
296 : // isn't any real estimation happening here. So case A does not do any real
297 : // estimation. However, when we introduce parallel compression, there will be
298 : // estimation in that the client goroutine will call addInFlight and the
299 : // compression goroutines will call writtenWithDelta.
300 : type sizeEstimate struct {
301 : // emptySize is the size when there is no inflight data, and numEntries is 0.
302 : // emptySize is constant once set.
303 : emptySize uint64
304 :
305 : // inflightSize is the estimated size of some inflight data which hasn't
306 : // been written yet.
307 : inflightSize uint64
308 :
309 : // totalSize is the total size of the data which has already been written.
310 : totalSize uint64
311 :
312 : // numWrittenEntries is the total number of entries which have already been
313 : // written.
314 : numWrittenEntries uint64
315 : // numInflightEntries is the total number of entries which are inflight, and
316 : // haven't been written.
317 : numInflightEntries uint64
318 :
319 : // maxEstimatedSize stores the maximum result returned from sizeEstimate.size.
320 : // It ensures that values returned from subsequent calls to Writer.EstimatedSize
321 : // never decrease.
322 : maxEstimatedSize uint64
323 :
324 : // We assume that the entries added to the sizeEstimate can be compressed.
325 : // For this reason, we keep track of a compressedSize and an uncompressedSize
326 : // to compute a compression ratio for the inflight entries. If the entries
327 : // aren't being compressed, then compressedSize and uncompressedSize must be
328 : // equal.
329 : compressedSize uint64
330 : uncompressedSize uint64
331 : }
332 :
333 1 : func (s *sizeEstimate) init(emptySize uint64) {
334 1 : s.emptySize = emptySize
335 1 : }
336 :
337 1 : func (s *sizeEstimate) size() uint64 {
338 1 : ratio := float64(1)
339 1 : if s.uncompressedSize > 0 {
340 1 : ratio = float64(s.compressedSize) / float64(s.uncompressedSize)
341 1 : }
342 1 : estimatedInflightSize := uint64(float64(s.inflightSize) * ratio)
343 1 : total := s.totalSize + estimatedInflightSize
344 1 : if total > s.maxEstimatedSize {
345 1 : s.maxEstimatedSize = total
346 1 : } else {
347 1 : total = s.maxEstimatedSize
348 1 : }
349 :
350 1 : if total == 0 {
351 1 : return s.emptySize
352 1 : }
353 :
354 1 : return total
355 : }
356 :
357 1 : func (s *sizeEstimate) numTotalEntries() uint64 {
358 1 : return s.numWrittenEntries + s.numInflightEntries
359 1 : }
360 :
361 1 : func (s *sizeEstimate) addInflight(size int) {
362 1 : s.numInflightEntries++
363 1 : s.inflightSize += uint64(size)
364 1 : }
365 :
366 1 : func (s *sizeEstimate) writtenWithTotal(newTotalSize uint64, inflightSize int) {
367 1 : finalEntrySize := int(newTotalSize - s.totalSize)
368 1 : s.writtenWithDelta(finalEntrySize, inflightSize)
369 1 : }
370 :
371 1 : func (s *sizeEstimate) writtenWithDelta(finalEntrySize int, inflightSize int) {
372 1 : if inflightSize > 0 {
373 1 : // This entry was previously inflight, so we should decrement inflight
374 1 : // entries and update the "compression" stats for future estimation.
375 1 : s.numInflightEntries--
376 1 : s.inflightSize -= uint64(inflightSize)
377 1 : s.uncompressedSize += uint64(inflightSize)
378 1 : s.compressedSize += uint64(finalEntrySize)
379 1 : }
380 1 : s.numWrittenEntries++
381 1 : s.totalSize += uint64(finalEntrySize)
382 : }
383 :
384 1 : func (s *sizeEstimate) clear() {
385 1 : *s = sizeEstimate{emptySize: s.emptySize}
386 1 : }
387 :
388 : type indexBlockBuf struct {
389 : // block will only be accessed from the writeQueue.
390 : block rowblk.Writer
391 :
392 : size struct {
393 : useMutex bool
394 : mu sync.Mutex
395 : estimate sizeEstimate
396 : }
397 :
398 : // restartInterval matches indexBlockBuf.block.restartInterval. We store it twice, because the `block`
399 : // must only be accessed from the writeQueue goroutine.
400 : restartInterval int
401 : }
402 :
403 1 : func (i *indexBlockBuf) clear() {
404 1 : i.block.Reset()
405 1 : if i.size.useMutex {
406 1 : i.size.mu.Lock()
407 1 : defer i.size.mu.Unlock()
408 1 : }
409 1 : i.size.estimate.clear()
410 1 : i.restartInterval = 0
411 : }
412 :
413 : var indexBlockBufPool = sync.Pool{
414 1 : New: func() interface{} {
415 1 : return &indexBlockBuf{}
416 1 : },
417 : }
418 :
419 : const indexBlockRestartInterval = 1
420 :
421 1 : func newIndexBlockBuf(useMutex bool) *indexBlockBuf {
422 1 : i := indexBlockBufPool.Get().(*indexBlockBuf)
423 1 : i.size.useMutex = useMutex
424 1 : i.restartInterval = indexBlockRestartInterval
425 1 : i.block.RestartInterval = indexBlockRestartInterval
426 1 : i.size.estimate.init(rowblk.EmptySize)
427 1 : return i
428 1 : }
429 :
430 : func (i *indexBlockBuf) shouldFlush(
431 : sep InternalKey, valueLen int, flushOptions flushDecisionOptions, sizeClassHints []int,
432 1 : ) bool {
433 1 : if i.size.useMutex {
434 1 : i.size.mu.Lock()
435 1 : defer i.size.mu.Unlock()
436 1 : }
437 :
438 1 : nEntries := i.size.estimate.numTotalEntries()
439 1 : return shouldFlushWithHints(
440 1 : sep.Size(), valueLen, i.restartInterval, int(i.size.estimate.size()),
441 1 : int(nEntries), flushOptions, sizeClassHints)
442 : }
443 :
444 1 : func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) {
445 1 : i.block.Add(key, value)
446 1 : size := i.block.EstimatedSize()
447 1 : if i.size.useMutex {
448 1 : i.size.mu.Lock()
449 1 : defer i.size.mu.Unlock()
450 1 : }
451 1 : i.size.estimate.writtenWithTotal(uint64(size), inflightSize)
452 : }
453 :
454 1 : func (i *indexBlockBuf) finish() []byte {
455 1 : b := i.block.Finish()
456 1 : return b
457 1 : }
458 :
459 1 : func (i *indexBlockBuf) addInflight(inflightSize int) {
460 1 : if i.size.useMutex {
461 1 : i.size.mu.Lock()
462 1 : defer i.size.mu.Unlock()
463 1 : }
464 1 : i.size.estimate.addInflight(inflightSize)
465 : }
466 :
467 1 : func (i *indexBlockBuf) estimatedSize() uint64 {
468 1 : if i.size.useMutex {
469 1 : i.size.mu.Lock()
470 1 : defer i.size.mu.Unlock()
471 1 : }
472 :
473 : // Make sure that the size estimation works as expected when parallelism
474 : // is disabled.
475 1 : if invariants.Enabled && !i.size.useMutex {
476 1 : if i.size.estimate.inflightSize != 0 {
477 0 : panic("unexpected inflight entry in index block size estimation")
478 : }
479 :
480 : // NB: The i.block should only be accessed from the writeQueue goroutine,
481 : // when parallelism is enabled. We break that invariant here, but that's
482 : // okay since parallelism is disabled.
483 1 : if i.size.estimate.size() != uint64(i.block.EstimatedSize()) {
484 0 : panic("index block size estimation sans parallelism is incorrect")
485 : }
486 : }
487 1 : return i.size.estimate.size()
488 : }
489 :
490 : // sizeEstimate is used for sstable size estimation. sizeEstimate can be
491 : // accessed by the Writer client and compressionQueue goroutines. Fields
492 : // should only be read/updated through the functions defined on the
493 : // *sizeEstimate type.
494 : type dataBlockEstimates struct {
495 : // If we don't do block compression in parallel, then we don't need to take
496 : // the performance hit of synchronizing using this mutex.
497 : useMutex bool
498 : mu sync.Mutex
499 :
500 : estimate sizeEstimate
501 : }
502 :
503 : // inflightSize is the uncompressed block size estimate which has been
504 : // previously provided to addInflightDataBlock(). If addInflightDataBlock()
505 : // has not been called, this must be set to 0. compressedSize is the
506 : // compressed size of the block.
507 1 : func (d *dataBlockEstimates) dataBlockCompressed(compressedSize int, inflightSize int) {
508 1 : if d.useMutex {
509 0 : d.mu.Lock()
510 0 : defer d.mu.Unlock()
511 0 : }
512 1 : d.estimate.writtenWithDelta(compressedSize+block.TrailerLen, inflightSize)
513 : }
514 :
515 : // size is an estimated size of datablock data which has been written to disk.
516 1 : func (d *dataBlockEstimates) size() uint64 {
517 1 : if d.useMutex {
518 0 : d.mu.Lock()
519 0 : defer d.mu.Unlock()
520 0 : }
521 : // If there is no parallel compression, there should not be any inflight bytes.
522 1 : if invariants.Enabled && !d.useMutex {
523 1 : if d.estimate.inflightSize != 0 {
524 0 : panic("unexpected inflight entry in data block size estimation")
525 : }
526 : }
527 1 : return d.estimate.size()
528 : }
529 :
530 : // Avoid linter unused error.
531 : var _ = (&dataBlockEstimates{}).addInflightDataBlock
532 :
533 : // NB: unused since no parallel compression.
534 0 : func (d *dataBlockEstimates) addInflightDataBlock(size int) {
535 0 : if d.useMutex {
536 0 : d.mu.Lock()
537 0 : defer d.mu.Unlock()
538 0 : }
539 :
540 0 : d.estimate.addInflight(size)
541 : }
542 :
543 : var writeTaskPool = sync.Pool{
544 1 : New: func() interface{} {
545 1 : t := &writeTask{}
546 1 : t.compressionDone = make(chan bool, 1)
547 1 : return t
548 1 : },
549 : }
550 :
551 : type blockBuf struct {
552 : // tmp is a scratch buffer, large enough to hold either footerLen bytes,
553 : // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most
554 : // likely large enough for a block handle with properties.
555 : tmp [blockHandleLikelyMaxLen]byte
556 : // compressedBuf is the destination buffer for compression. It is re-used over the
557 : // lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block.
558 : compressedBuf []byte
559 : checksummer block.Checksummer
560 : }
561 :
562 1 : func (b *blockBuf) clear() {
563 1 : // We can't assign b.compressedBuf[:0] to compressedBuf because snappy relies
564 1 : // on the length of the buffer, and not the capacity to determine if it needs
565 1 : // to make an allocation.
566 1 : *b = blockBuf{
567 1 : compressedBuf: b.compressedBuf, checksummer: b.checksummer,
568 1 : }
569 1 : }
570 :
571 : // A dataBlockBuf holds all the state required to compress and write a data block to disk.
572 : // A dataBlockBuf begins its lifecycle owned by the Writer client goroutine. The Writer
573 : // client goroutine adds keys to the sstable, writing directly into a dataBlockBuf's blockWriter
574 : // until the block is full. Once a dataBlockBuf's block is full, the dataBlockBuf may be passed
575 : // to other goroutines for compression and file I/O.
576 : type dataBlockBuf struct {
577 : blockBuf
578 : dataBlock rowblk.Writer
579 :
580 : // uncompressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
581 : // next byte slice to be compressed. The uncompressed byte slice will be backed by the
582 : // dataBlock.buf.
583 : uncompressed []byte
584 : // compressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
585 : // compressed byte slice which must be written to disk. The compressed byte slice may be
586 : // backed by the dataBlock.buf, or the dataBlockBuf.compressedBuf, depending on whether
587 : // we use the result of the compression.
588 : compressed []byte
589 : // trailer is the block trailer encoding the compression type and checksum.
590 : trailer block.Trailer
591 :
592 : // We're making calls to BlockPropertyCollectors from the Writer client goroutine. We need to
593 : // pass the encoded block properties over to the write queue. To prevent copies, and allocations,
594 : // we give each dataBlockBuf, a blockPropertiesEncoder.
595 : blockPropsEncoder blockPropertiesEncoder
596 : // dataBlockProps is set when Writer.finishDataBlockProps is called. The dataBlockProps slice is
597 : // a shallow copy of the internal buffer of the dataBlockBuf.blockPropsEncoder.
598 : dataBlockProps []byte
599 :
600 : // sepScratch is reusable scratch space for computing separator keys.
601 : sepScratch []byte
602 : }
603 :
604 1 : func (d *dataBlockBuf) clear() {
605 1 : d.blockBuf.clear()
606 1 : d.dataBlock.Reset()
607 1 :
608 1 : d.uncompressed = nil
609 1 : d.compressed = nil
610 1 : d.dataBlockProps = nil
611 1 : d.sepScratch = d.sepScratch[:0]
612 1 : }
613 :
614 : var dataBlockBufPool = sync.Pool{
615 1 : New: func() interface{} {
616 1 : return &dataBlockBuf{}
617 1 : },
618 : }
619 :
620 1 : func newDataBlockBuf(restartInterval int, checksumType block.ChecksumType) *dataBlockBuf {
621 1 : d := dataBlockBufPool.Get().(*dataBlockBuf)
622 1 : d.dataBlock.RestartInterval = restartInterval
623 1 : d.checksummer.Type = checksumType
624 1 : return d
625 1 : }
626 :
627 1 : func (d *dataBlockBuf) finish() {
628 1 : d.uncompressed = d.dataBlock.Finish()
629 1 : }
630 :
631 1 : func (d *dataBlockBuf) compressAndChecksum(c Compression) {
632 1 : d.compressed, d.trailer = compressAndChecksum(d.uncompressed, c, &d.blockBuf)
633 1 : }
634 :
635 : func (d *dataBlockBuf) shouldFlush(
636 : key InternalKey, valueLen int, flushOptions flushDecisionOptions, sizeClassHints []int,
637 1 : ) bool {
638 1 : return shouldFlushWithHints(
639 1 : key.Size(), valueLen, d.dataBlock.RestartInterval, d.dataBlock.EstimatedSize(),
640 1 : d.dataBlock.EntryCount(), flushOptions, sizeClassHints)
641 1 : }
642 :
643 : type indexBlockAndBlockProperties struct {
644 : nEntries int
645 : // sep is the last key added to this block, for computing a separator later.
646 : sep InternalKey
647 : properties []byte
648 : // block is the encoded block produced by blockWriter.finish.
649 : block []byte
650 : }
651 :
652 : // Set sets the value for the given key. The sequence number is set to 0.
653 : // Intended for use to externally construct an sstable before ingestion into a
654 : // DB. For a given Writer, the keys passed to Set must be in strictly increasing
655 : // order.
656 : //
657 : // TODO(peter): untested
658 1 : func (w *Writer) Set(key, value []byte) error {
659 1 : if w.err != nil {
660 0 : return w.err
661 0 : }
662 1 : if w.isStrictObsolete {
663 0 : return errors.Errorf("use AddWithForceObsolete")
664 0 : }
665 : // forceObsolete is false based on the assumption that no RANGEDELs in the
666 : // sstable delete the added points.
667 1 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindSet), value, false)
668 : }
669 :
670 : // Delete deletes the value for the given key. The sequence number is set to
671 : // 0. Intended for use to externally construct an sstable before ingestion into
672 : // a DB.
673 : //
674 : // TODO(peter): untested
675 1 : func (w *Writer) Delete(key []byte) error {
676 1 : if w.err != nil {
677 0 : return w.err
678 0 : }
679 1 : if w.isStrictObsolete {
680 0 : return errors.Errorf("use AddWithForceObsolete")
681 0 : }
682 : // forceObsolete is false based on the assumption that no RANGEDELs in the
683 : // sstable delete the added points.
684 1 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindDelete), nil, false)
685 : }
686 :
687 : // DeleteRange deletes all of the keys (and values) in the range [start,end)
688 : // (inclusive on start, exclusive on end). The sequence number is set to
689 : // 0. Intended for use to externally construct an sstable before ingestion into
690 : // a DB.
691 : //
692 : // TODO(peter): untested
693 1 : func (w *Writer) DeleteRange(start, end []byte) error {
694 1 : if w.err != nil {
695 0 : return w.err
696 0 : }
697 1 : return w.addTombstone(base.MakeInternalKey(start, 0, InternalKeyKindRangeDelete), end)
698 : }
699 :
700 : // Merge adds an action to the DB that merges the value at key with the new
701 : // value. The details of the merge are dependent upon the configured merge
702 : // operator. The sequence number is set to 0. Intended for use to externally
703 : // construct an sstable before ingestion into a DB.
704 : //
705 : // TODO(peter): untested
706 0 : func (w *Writer) Merge(key, value []byte) error {
707 0 : if w.err != nil {
708 0 : return w.err
709 0 : }
710 0 : if w.isStrictObsolete {
711 0 : return errors.Errorf("use AddWithForceObsolete")
712 0 : }
713 : // forceObsolete is false based on the assumption that no RANGEDELs in the
714 : // sstable that delete the added points. If the user configured this writer
715 : // to be strict-obsolete, addPoint will reject the addition of this MERGE.
716 0 : return w.addPoint(base.MakeInternalKey(key, 0, InternalKeyKindMerge), value, false)
717 : }
718 :
719 : // Add adds a key/value pair to the table being written. For a given Writer,
720 : // the keys passed to Add must be in increasing order. The exception to this
721 : // rule is range deletion tombstones. Range deletion tombstones need to be
722 : // added ordered by their start key, but they can be added out of order from
723 : // point entries. Additionally, range deletion tombstones must be fragmented
724 : // (i.e. by keyspan.Fragmenter).
725 1 : func (w *Writer) Add(key InternalKey, value []byte) error {
726 1 : if w.isStrictObsolete {
727 0 : return errors.Errorf("use AddWithForceObsolete")
728 0 : }
729 1 : return w.AddWithForceObsolete(key, value, false)
730 : }
731 :
732 : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
733 : //
734 : // forceObsolete indicates whether the caller has determined that this key is
735 : // obsolete even though it may be the latest point key for this userkey. This
736 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
737 : // strict-obsolete sstables.
738 : //
739 : // Note that there are two properties, S1 and S2 (see comment in format.go)
740 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
741 : // responsibility of the caller. S1 is solely the responsibility of the
742 : // callee.
743 1 : func (w *Writer) AddWithForceObsolete(key InternalKey, value []byte, forceObsolete bool) error {
744 1 : if w.err != nil {
745 0 : return w.err
746 0 : }
747 :
748 1 : switch key.Kind() {
749 1 : case InternalKeyKindRangeDelete:
750 1 : return w.addTombstone(key, value)
751 : case base.InternalKeyKindRangeKeyDelete,
752 : base.InternalKeyKindRangeKeySet,
753 0 : base.InternalKeyKindRangeKeyUnset:
754 0 : w.err = errors.Errorf(
755 0 : "pebble: range keys must be added via one of the RangeKey* functions")
756 0 : return w.err
757 : }
758 1 : return w.addPoint(key, value, forceObsolete)
759 : }
760 :
761 1 : func (w *Writer) makeAddPointDecisionV2(key InternalKey) error {
762 1 : prevTrailer := w.lastPointKeyInfo.trailer
763 1 : w.lastPointKeyInfo.trailer = key.Trailer
764 1 : if w.dataBlockBuf.dataBlock.EntryCount() == 0 {
765 1 : return nil
766 1 : }
767 1 : if !w.disableKeyOrderChecks {
768 1 : prevPointUserKey := w.dataBlockBuf.dataBlock.CurUserKey()
769 1 : cmpUser := w.compare(prevPointUserKey, key.UserKey)
770 1 : if cmpUser > 0 || (cmpUser == 0 && prevTrailer <= key.Trailer) {
771 1 : return errors.Errorf(
772 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
773 1 : InternalKey{UserKey: prevPointUserKey, Trailer: prevTrailer}.Pretty(w.formatKey),
774 1 : key.Pretty(w.formatKey))
775 1 : }
776 : }
777 1 : return nil
778 : }
779 :
780 : // REQUIRES: at least one point has been written to the Writer.
781 1 : func (w *Writer) getLastPointUserKey() []byte {
782 1 : if w.dataBlockBuf.dataBlock.EntryCount() == 0 {
783 0 : panic(errors.AssertionFailedf("no point keys added to writer"))
784 : }
785 1 : return w.dataBlockBuf.dataBlock.CurUserKey()
786 : }
787 :
788 : // REQUIRES: w.tableFormat >= TableFormatPebblev3
789 : func (w *Writer) makeAddPointDecisionV3(
790 : key InternalKey, valueLen int,
791 1 : ) (setHasSamePrefix bool, writeToValueBlock bool, isObsolete bool, err error) {
792 1 : prevPointKeyInfo := w.lastPointKeyInfo
793 1 : w.lastPointKeyInfo.userKeyLen = len(key.UserKey)
794 1 : w.lastPointKeyInfo.prefixLen = w.split(key.UserKey)
795 1 : w.lastPointKeyInfo.trailer = key.Trailer
796 1 : w.lastPointKeyInfo.isObsolete = false
797 1 : if !w.meta.HasPointKeys {
798 1 : return false, false, false, nil
799 1 : }
800 1 : keyKind := key.Trailer.Kind()
801 1 : prevPointUserKey := w.getLastPointUserKey()
802 1 : prevPointKey := InternalKey{UserKey: prevPointUserKey, Trailer: prevPointKeyInfo.trailer}
803 1 : prevKeyKind := prevPointKeyInfo.trailer.Kind()
804 1 : considerWriteToValueBlock := prevKeyKind == InternalKeyKindSet &&
805 1 : keyKind == InternalKeyKindSet
806 1 : if considerWriteToValueBlock && !w.requiredInPlaceValueBound.IsEmpty() {
807 1 : keyPrefix := key.UserKey[:w.lastPointKeyInfo.prefixLen]
808 1 : cmpUpper := w.compare(
809 1 : w.requiredInPlaceValueBound.Upper, keyPrefix)
810 1 : if cmpUpper <= 0 {
811 1 : // Common case for CockroachDB. Make it empty since all future keys in
812 1 : // this sstable will also have cmpUpper <= 0.
813 1 : w.requiredInPlaceValueBound = UserKeyPrefixBound{}
814 1 : } else if w.compare(keyPrefix, w.requiredInPlaceValueBound.Lower) >= 0 {
815 1 : considerWriteToValueBlock = false
816 1 : }
817 : }
818 : // cmpPrefix is initialized iff considerWriteToValueBlock.
819 1 : var cmpPrefix int
820 1 : var cmpUser int
821 1 : if considerWriteToValueBlock {
822 1 : // Compare the prefixes.
823 1 : cmpPrefix = w.compare(prevPointUserKey[:prevPointKeyInfo.prefixLen],
824 1 : key.UserKey[:w.lastPointKeyInfo.prefixLen])
825 1 : cmpUser = cmpPrefix
826 1 : if cmpPrefix == 0 {
827 1 : // Need to compare suffixes to compute cmpUser.
828 1 : cmpUser = w.compare(prevPointUserKey[prevPointKeyInfo.prefixLen:],
829 1 : key.UserKey[w.lastPointKeyInfo.prefixLen:])
830 1 : }
831 1 : } else {
832 1 : cmpUser = w.compare(prevPointUserKey, key.UserKey)
833 1 : }
834 : // Ensure that no one adds a point key kind without considering the obsolete
835 : // handling for that kind.
836 1 : switch keyKind {
837 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
838 1 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
839 0 : default:
840 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
841 : }
842 : // If same user key, then the current key is obsolete if any of the
843 : // following is true:
844 : // C1 The prev key was obsolete.
845 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
846 : // preserve SET* and MERGE since their values will be merged into the
847 : // previous key. We also must preserve DEL* since there may be an older
848 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
849 : // if we omit the DEL* that lower SET*/MERGE will become visible.
850 : //
851 : // Regardless of whether it is the same user key or not
852 : // C3 The current key is some kind of point delete, and we are writing to
853 : // the lowest level, then it is also obsolete. The correctness of this
854 : // relies on the same user key not spanning multiple sstables in a level.
855 : //
856 : // C1 ensures that for a user key there is at most one transition from
857 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
858 : // are not obsolete. We consider the various value of n:
859 : //
860 : // n = 0: This happens due to forceObsolete being set by the caller, or due
861 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
862 : // must also delete all the lower seqnums for the same user key. C3 triggers
863 : // due to a point delete and that deletes all the lower seqnums for the same
864 : // user key.
865 : //
866 : // n = 1: This is the common case. It happens when the first key is not a
867 : // MERGE, or the current key is some kind of point delete.
868 : //
869 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
870 : // single non-MERGE key.
871 1 : isObsoleteC1AndC2 := cmpUser == 0 &&
872 1 : (prevPointKeyInfo.isObsolete || prevKeyKind != InternalKeyKindMerge)
873 1 : isObsoleteC3 := w.writingToLowestLevel &&
874 1 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
875 1 : keyKind == InternalKeyKindDeleteSized)
876 1 : isObsolete = isObsoleteC1AndC2 || isObsoleteC3
877 1 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
878 1 : // possible, but requires some care in documenting and checking invariants.
879 1 : // There is code that assumes nothing in value blocks because of single MVCC
880 1 : // version (those should be ok). We have to ensure setHasSamePrefix is
881 1 : // correctly initialized here etc.
882 1 :
883 1 : if !w.disableKeyOrderChecks &&
884 1 : (cmpUser > 0 || (cmpUser == 0 && prevPointKeyInfo.trailer <= key.Trailer)) {
885 1 : return false, false, false, errors.Errorf(
886 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
887 1 : prevPointKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
888 1 : }
889 1 : if !considerWriteToValueBlock {
890 1 : return false, false, isObsolete, nil
891 1 : }
892 : // NB: it is possible that cmpUser == 0, i.e., these two SETs have identical
893 : // user keys (because of an open snapshot). This should be the rare case.
894 1 : setHasSamePrefix = cmpPrefix == 0
895 1 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
896 1 : // valueHandle, this should be > 3. But tiny values are common in test and
897 1 : // unlikely in production, so we use 0 here for better test coverage.
898 1 : const tinyValueThreshold = 0
899 1 : // NB: setting WriterOptions.DisableValueBlocks does not disable the
900 1 : // setHasSamePrefix optimization.
901 1 : considerWriteToValueBlock = setHasSamePrefix && valueLen > tinyValueThreshold && w.valueBlockWriter != nil
902 1 : return setHasSamePrefix, considerWriteToValueBlock, isObsolete, nil
903 : }
904 :
905 1 : func (w *Writer) addPoint(key InternalKey, value []byte, forceObsolete bool) error {
906 1 : if w.isStrictObsolete && key.Kind() == InternalKeyKindMerge {
907 1 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
908 1 : }
909 1 : var err error
910 1 : var setHasSameKeyPrefix, writeToValueBlock, addPrefixToValueStoredWithKey bool
911 1 : var isObsolete bool
912 1 : maxSharedKeyLen := len(key.UserKey)
913 1 : if w.tableFormat >= TableFormatPebblev3 {
914 1 : // maxSharedKeyLen is limited to the prefix of the preceding key. If the
915 1 : // preceding key was in a different block, then the blockWriter will
916 1 : // ignore this maxSharedKeyLen.
917 1 : maxSharedKeyLen = w.lastPointKeyInfo.prefixLen
918 1 : setHasSameKeyPrefix, writeToValueBlock, isObsolete, err =
919 1 : w.makeAddPointDecisionV3(key, len(value))
920 1 : addPrefixToValueStoredWithKey = key.Kind() == InternalKeyKindSet
921 1 : } else {
922 1 : err = w.makeAddPointDecisionV2(key)
923 1 : }
924 1 : if err != nil {
925 1 : return err
926 1 : }
927 1 : isObsolete = w.tableFormat >= TableFormatPebblev4 && (isObsolete || forceObsolete)
928 1 : w.lastPointKeyInfo.isObsolete = isObsolete
929 1 : var valueStoredWithKey []byte
930 1 : var prefix block.ValuePrefix
931 1 : var valueStoredWithKeyLen int
932 1 : if writeToValueBlock {
933 1 : vh, err := w.valueBlockWriter.addValue(value)
934 1 : if err != nil {
935 0 : return err
936 0 : }
937 1 : n := encodeValueHandle(w.blockBuf.tmp[:], vh)
938 1 : valueStoredWithKey = w.blockBuf.tmp[:n]
939 1 : valueStoredWithKeyLen = len(valueStoredWithKey) + 1
940 1 : var attribute base.ShortAttribute
941 1 : if w.shortAttributeExtractor != nil {
942 1 : // TODO(sumeer): for compactions, it is possible that the input sstable
943 1 : // already has this value in the value section and so we have already
944 1 : // extracted the ShortAttribute. Avoid extracting it again. This will
945 1 : // require changing the Writer.Add interface.
946 1 : if attribute, err = w.shortAttributeExtractor(
947 1 : key.UserKey, w.lastPointKeyInfo.prefixLen, value); err != nil {
948 0 : return err
949 0 : }
950 : }
951 1 : prefix = block.ValueHandlePrefix(setHasSameKeyPrefix, attribute)
952 1 : } else {
953 1 : valueStoredWithKey = value
954 1 : valueStoredWithKeyLen = len(value)
955 1 : if addPrefixToValueStoredWithKey {
956 1 : valueStoredWithKeyLen++
957 1 : }
958 1 : prefix = block.InPlaceValuePrefix(setHasSameKeyPrefix)
959 : }
960 :
961 1 : if err := w.maybeFlush(key, valueStoredWithKeyLen); err != nil {
962 1 : return err
963 1 : }
964 :
965 1 : for i := range w.blockPropCollectors {
966 1 : v := value
967 1 : if addPrefixToValueStoredWithKey {
968 1 : // Values for SET are not required to be in-place, and in the future may
969 1 : // not even be read by the compaction, so pass nil values. Block
970 1 : // property collectors in such Pebble DB's must not look at the value.
971 1 : v = nil
972 1 : }
973 1 : if err := w.blockPropCollectors[i].Add(key, v); err != nil {
974 1 : w.err = err
975 1 : return err
976 1 : }
977 : }
978 1 : if w.tableFormat >= TableFormatPebblev4 {
979 1 : w.obsoleteCollector.AddPoint(isObsolete)
980 1 : }
981 :
982 1 : w.maybeAddToFilter(key.UserKey)
983 1 : w.dataBlockBuf.dataBlock.AddWithOptionalValuePrefix(
984 1 : key, isObsolete, valueStoredWithKey, maxSharedKeyLen, addPrefixToValueStoredWithKey, prefix,
985 1 : setHasSameKeyPrefix)
986 1 :
987 1 : w.meta.updateSeqNum(key.SeqNum())
988 1 :
989 1 : if !w.meta.HasPointKeys {
990 1 : k := w.dataBlockBuf.dataBlock.CurKey()
991 1 : // NB: We need to ensure that SmallestPoint.UserKey is set, so we create
992 1 : // an InternalKey which is semantically identical to the key, but won't
993 1 : // have a nil UserKey. We do this, because key.UserKey could be nil, and
994 1 : // we don't want SmallestPoint.UserKey to be nil.
995 1 : //
996 1 : // todo(bananabrick): Determine if it's okay to have a nil SmallestPoint
997 1 : // .UserKey now that we don't rely on a nil UserKey to determine if the
998 1 : // key has been set or not.
999 1 : w.meta.SetSmallestPointKey(k.Clone())
1000 1 : }
1001 :
1002 1 : w.props.NumEntries++
1003 1 : switch key.Kind() {
1004 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
1005 1 : w.props.NumDeletions++
1006 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
1007 1 : case InternalKeyKindDeleteSized:
1008 1 : var size uint64
1009 1 : if len(value) > 0 {
1010 1 : var n int
1011 1 : size, n = binary.Uvarint(value)
1012 1 : if n <= 0 {
1013 0 : w.err = errors.Newf("%s key's value (%x) does not parse as uvarint",
1014 0 : errors.Safe(key.Kind().String()), value)
1015 0 : return w.err
1016 0 : }
1017 : }
1018 1 : w.props.NumDeletions++
1019 1 : w.props.NumSizedDeletions++
1020 1 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
1021 1 : w.props.RawPointTombstoneValueSize += size
1022 1 : case InternalKeyKindMerge:
1023 1 : w.props.NumMergeOperands++
1024 : }
1025 1 : w.props.RawKeySize += uint64(key.Size())
1026 1 : w.props.RawValueSize += uint64(len(value))
1027 1 : return nil
1028 : }
1029 :
1030 1 : func (w *Writer) prettyTombstone(k InternalKey, value []byte) fmt.Formatter {
1031 1 : return keyspan.Span{
1032 1 : Start: k.UserKey,
1033 1 : End: value,
1034 1 : Keys: []keyspan.Key{{Trailer: k.Trailer}},
1035 1 : }.Pretty(w.formatKey)
1036 1 : }
1037 :
1038 1 : func (w *Writer) addTombstone(key InternalKey, value []byte) error {
1039 1 : if !w.disableKeyOrderChecks && w.rangeDelBlock.EntryCount() > 0 {
1040 1 : // Check that tombstones are being added in fragmented order. If the two
1041 1 : // tombstones overlap, their start and end keys must be identical.
1042 1 : prevKey := w.rangeDelBlock.CurKey()
1043 1 : switch c := w.compare(prevKey.UserKey, key.UserKey); {
1044 0 : case c > 0:
1045 0 : w.err = errors.Errorf("pebble: keys must be added in order: %s, %s",
1046 0 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1047 0 : return w.err
1048 1 : case c == 0:
1049 1 : prevValue := w.rangeDelBlock.CurValue()
1050 1 : if w.compare(prevValue, value) != 0 {
1051 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
1052 1 : w.prettyTombstone(prevKey, prevValue),
1053 1 : w.prettyTombstone(key, value))
1054 1 : return w.err
1055 1 : }
1056 1 : if prevKey.SeqNum() <= key.SeqNum() {
1057 1 : w.err = errors.Errorf("pebble: keys must be added in strictly increasing order: %s, %s",
1058 1 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1059 1 : return w.err
1060 1 : }
1061 1 : default:
1062 1 : prevValue := w.rangeDelBlock.CurValue()
1063 1 : if w.compare(prevValue, key.UserKey) > 0 {
1064 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
1065 1 : w.prettyTombstone(prevKey, prevValue),
1066 1 : w.prettyTombstone(key, value))
1067 1 : return w.err
1068 1 : }
1069 : }
1070 : }
1071 :
1072 1 : if key.Trailer == base.InternalKeyRangeDeleteSentinel {
1073 0 : w.err = errors.Errorf("pebble: cannot add range delete sentinel: %s", key.Pretty(w.formatKey))
1074 0 : return w.err
1075 0 : }
1076 :
1077 1 : w.meta.updateSeqNum(key.SeqNum())
1078 1 :
1079 1 : // Range tombstones are fragmented in the v2 range deletion block format,
1080 1 : // so the start key of the first range tombstone added will be the smallest
1081 1 : // range tombstone key. The largest range tombstone key will be determined
1082 1 : // in Writer.Close() as the end key of the last range tombstone added.
1083 1 : if w.props.NumRangeDeletions == 0 {
1084 1 : w.meta.SetSmallestRangeDelKey(key.Clone())
1085 1 : }
1086 :
1087 1 : w.props.NumEntries++
1088 1 : w.props.NumDeletions++
1089 1 : w.props.NumRangeDeletions++
1090 1 : w.props.RawKeySize += uint64(key.Size())
1091 1 : w.props.RawValueSize += uint64(len(value))
1092 1 : w.rangeDelBlock.Add(key, value)
1093 1 : return nil
1094 : }
1095 :
1096 : // RangeKeySet sets a range between start (inclusive) and end (exclusive) with
1097 : // the given suffix to the given value. The resulting range key is given the
1098 : // sequence number zero, with the expectation that the resulting sstable will be
1099 : // ingested.
1100 : //
1101 : // Keys must be added to the table in increasing order of start key. Spans are
1102 : // not required to be fragmented. The same suffix may not be set or unset twice
1103 : // over the same keyspan, because it would result in inconsistent state. Both
1104 : // the Set and Unset would share the zero sequence number, and a key cannot be
1105 : // both simultaneously set and unset.
1106 1 : func (w *Writer) RangeKeySet(start, end, suffix, value []byte) error {
1107 1 : return w.addRangeKeySpan(keyspan.Span{
1108 1 : Start: w.tempRangeKeyCopy(start),
1109 1 : End: w.tempRangeKeyCopy(end),
1110 1 : Keys: []keyspan.Key{
1111 1 : {
1112 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
1113 1 : Suffix: w.tempRangeKeyCopy(suffix),
1114 1 : Value: w.tempRangeKeyCopy(value),
1115 1 : },
1116 1 : },
1117 1 : })
1118 1 : }
1119 :
1120 : // RangeKeyUnset un-sets a range between start (inclusive) and end (exclusive)
1121 : // with the given suffix. The resulting range key is given the
1122 : // sequence number zero, with the expectation that the resulting sstable will be
1123 : // ingested.
1124 : //
1125 : // Keys must be added to the table in increasing order of start key. Spans are
1126 : // not required to be fragmented. The same suffix may not be set or unset twice
1127 : // over the same keyspan, because it would result in inconsistent state. Both
1128 : // the Set and Unset would share the zero sequence number, and a key cannot be
1129 : // both simultaneously set and unset.
1130 1 : func (w *Writer) RangeKeyUnset(start, end, suffix []byte) error {
1131 1 : return w.addRangeKeySpan(keyspan.Span{
1132 1 : Start: w.tempRangeKeyCopy(start),
1133 1 : End: w.tempRangeKeyCopy(end),
1134 1 : Keys: []keyspan.Key{
1135 1 : {
1136 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyUnset),
1137 1 : Suffix: w.tempRangeKeyCopy(suffix),
1138 1 : },
1139 1 : },
1140 1 : })
1141 1 : }
1142 :
1143 : // RangeKeyDelete deletes a range between start (inclusive) and end (exclusive).
1144 : //
1145 : // Keys must be added to the table in increasing order of start key. Spans are
1146 : // not required to be fragmented.
1147 1 : func (w *Writer) RangeKeyDelete(start, end []byte) error {
1148 1 : return w.addRangeKeySpan(keyspan.Span{
1149 1 : Start: w.tempRangeKeyCopy(start),
1150 1 : End: w.tempRangeKeyCopy(end),
1151 1 : Keys: []keyspan.Key{
1152 1 : {Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeyDelete)},
1153 1 : },
1154 1 : })
1155 1 : }
1156 :
1157 : // AddRangeKey adds a range key set, unset, or delete key/value pair to the
1158 : // table being written.
1159 : //
1160 : // Range keys must be supplied in strictly ascending order of start key (i.e.
1161 : // user key ascending, sequence number descending, and key type descending).
1162 : // Ranges added must also be supplied in fragmented span order - i.e. other than
1163 : // spans that are perfectly aligned (same start and end keys), spans may not
1164 : // overlap. Range keys may be added out of order relative to point keys and
1165 : // range deletions.
1166 1 : func (w *Writer) AddRangeKey(key InternalKey, value []byte) error {
1167 1 : if w.err != nil {
1168 0 : return w.err
1169 0 : }
1170 1 : return w.addRangeKey(key, value)
1171 : }
1172 :
1173 1 : func (w *Writer) addRangeKeySpan(span keyspan.Span) error {
1174 1 : if w.compare(span.Start, span.End) >= 0 {
1175 0 : return errors.Errorf(
1176 0 : "pebble: start key must be strictly less than end key",
1177 0 : )
1178 0 : }
1179 1 : if w.fragmenter.Start() != nil && w.compare(w.fragmenter.Start(), span.Start) > 0 {
1180 1 : return errors.Errorf("pebble: spans must be added in order: %s > %s",
1181 1 : w.formatKey(w.fragmenter.Start()), w.formatKey(span.Start))
1182 1 : }
1183 : // Add this span to the fragmenter.
1184 1 : w.fragmenter.Add(span)
1185 1 : return w.err
1186 : }
1187 :
1188 1 : func (w *Writer) encodeRangeKeySpan(span keyspan.Span) {
1189 1 : // This method is the emit function of the Fragmenter.
1190 1 : //
1191 1 : // NB: The span should only contain range keys and be internally consistent
1192 1 : // (eg, no duplicate suffixes, no additional keys after a RANGEKEYDEL).
1193 1 : //
1194 1 : // We use w.rangeKeysBySuffix and w.rangeKeySpan to avoid allocations.
1195 1 :
1196 1 : // Sort the keys by suffix. Iteration doesn't *currently* depend on it, but
1197 1 : // we may want to in the future.
1198 1 : w.rangeKeysBySuffix.Cmp = w.compare
1199 1 : w.rangeKeysBySuffix.Keys = span.Keys
1200 1 : sort.Sort(&w.rangeKeysBySuffix)
1201 1 :
1202 1 : w.rangeKeySpan = span
1203 1 : w.rangeKeySpan.Keys = w.rangeKeysBySuffix.Keys
1204 1 : w.err = firstError(w.err, w.rangeKeyEncoder.Encode(&w.rangeKeySpan))
1205 1 : }
1206 :
1207 1 : func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
1208 1 : if !w.disableKeyOrderChecks && w.rangeKeyBlock.EntryCount() > 0 {
1209 1 : prevStartKey := w.rangeKeyBlock.CurKey()
1210 1 : prevEndKey, _, err := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.CurValue())
1211 1 : if err != nil {
1212 0 : // We panic here as we should have previously decoded and validated this
1213 0 : // key and value when it was first added to the range key block.
1214 0 : panic(err)
1215 : }
1216 :
1217 1 : curStartKey := key
1218 1 : curEndKey, _, err := rangekey.DecodeEndKey(curStartKey.Kind(), value)
1219 1 : if err != nil {
1220 0 : w.err = err
1221 0 : return w.err
1222 0 : }
1223 :
1224 : // Start keys must be strictly increasing.
1225 1 : if base.InternalCompare(w.compare, prevStartKey, curStartKey) >= 0 {
1226 1 : w.err = errors.Errorf(
1227 1 : "pebble: range keys starts must be added in increasing order: %s, %s",
1228 1 : prevStartKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
1229 1 : return w.err
1230 1 : }
1231 :
1232 : // Start keys are increasing. If the start user keys are equal, the
1233 : // end keys must be equal (i.e. aligned spans).
1234 1 : if w.compare(prevStartKey.UserKey, curStartKey.UserKey) == 0 {
1235 1 : if w.compare(prevEndKey, curEndKey) != 0 {
1236 1 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
1237 1 : prevStartKey.Pretty(w.formatKey),
1238 1 : curStartKey.Pretty(w.formatKey))
1239 1 : return w.err
1240 1 : }
1241 1 : } else if w.compare(prevEndKey, curStartKey.UserKey) > 0 {
1242 1 : // If the start user keys are NOT equal, the spans must be disjoint (i.e.
1243 1 : // no overlap).
1244 1 : // NOTE: the inequality excludes zero, as we allow the end key of the
1245 1 : // lower span be the same as the start key of the upper span, because
1246 1 : // the range end key is considered an exclusive bound.
1247 1 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
1248 1 : prevStartKey.Pretty(w.formatKey),
1249 1 : curStartKey.Pretty(w.formatKey))
1250 1 : return w.err
1251 1 : }
1252 : }
1253 :
1254 : // TODO(travers): Add an invariant-gated check to ensure that suffix-values
1255 : // are sorted within coalesced spans.
1256 :
1257 : // Range-keys and point-keys are intended to live in "parallel" keyspaces.
1258 : // However, we track a single seqnum in the table metadata that spans both of
1259 : // these keyspaces.
1260 : // TODO(travers): Consider tracking range key seqnums separately.
1261 1 : w.meta.updateSeqNum(key.SeqNum())
1262 1 :
1263 1 : // Range tombstones are fragmented, so the start key of the first range key
1264 1 : // added will be the smallest. The largest range key is determined in
1265 1 : // Writer.Close() as the end key of the last range key added to the block.
1266 1 : if w.props.NumRangeKeys() == 0 {
1267 1 : w.meta.SetSmallestRangeKey(key.Clone())
1268 1 : }
1269 :
1270 : // Update block properties.
1271 1 : w.props.RawRangeKeyKeySize += uint64(key.Size())
1272 1 : w.props.RawRangeKeyValueSize += uint64(len(value))
1273 1 : switch key.Kind() {
1274 1 : case base.InternalKeyKindRangeKeyDelete:
1275 1 : w.props.NumRangeKeyDels++
1276 1 : case base.InternalKeyKindRangeKeySet:
1277 1 : w.props.NumRangeKeySets++
1278 1 : case base.InternalKeyKindRangeKeyUnset:
1279 1 : w.props.NumRangeKeyUnsets++
1280 0 : default:
1281 0 : panic(errors.Errorf("pebble: invalid range key type: %s", key.Kind()))
1282 : }
1283 :
1284 1 : for i := range w.blockPropCollectors {
1285 1 : if err := w.blockPropCollectors[i].Add(key, value); err != nil {
1286 0 : return err
1287 0 : }
1288 : }
1289 :
1290 : // Add the key to the block.
1291 1 : w.rangeKeyBlock.Add(key, value)
1292 1 : return nil
1293 : }
1294 :
1295 : // tempRangeKeyBuf returns a slice of length n from the Writer's rkBuf byte
1296 : // slice. Any byte written to the returned slice is retained for the lifetime of
1297 : // the Writer.
1298 1 : func (w *Writer) tempRangeKeyBuf(n int) []byte {
1299 1 : if cap(w.rkBuf)-len(w.rkBuf) < n {
1300 1 : size := len(w.rkBuf) + 2*n
1301 1 : if size < 2*cap(w.rkBuf) {
1302 1 : size = 2 * cap(w.rkBuf)
1303 1 : }
1304 1 : buf := make([]byte, len(w.rkBuf), size)
1305 1 : copy(buf, w.rkBuf)
1306 1 : w.rkBuf = buf
1307 : }
1308 1 : b := w.rkBuf[len(w.rkBuf) : len(w.rkBuf)+n]
1309 1 : w.rkBuf = w.rkBuf[:len(w.rkBuf)+n]
1310 1 : return b
1311 : }
1312 :
1313 : // tempRangeKeyCopy returns a copy of the provided slice, stored in the Writer's
1314 : // range key buffer.
1315 1 : func (w *Writer) tempRangeKeyCopy(k []byte) []byte {
1316 1 : if len(k) == 0 {
1317 1 : return nil
1318 1 : }
1319 1 : buf := w.tempRangeKeyBuf(len(k))
1320 1 : copy(buf, k)
1321 1 : return buf
1322 : }
1323 :
1324 1 : func (w *Writer) maybeAddToFilter(key []byte) {
1325 1 : if w.filter != nil {
1326 1 : prefix := key[:w.split(key)]
1327 1 : w.filter.addKey(prefix)
1328 1 : }
1329 : }
1330 :
1331 1 : func (w *Writer) flush(key InternalKey) error {
1332 1 : // We're finishing a data block.
1333 1 : err := w.finishDataBlockProps(w.dataBlockBuf)
1334 1 : if err != nil {
1335 1 : return err
1336 1 : }
1337 1 : w.dataBlockBuf.finish()
1338 1 : w.dataBlockBuf.compressAndChecksum(w.compression)
1339 1 : // Since dataBlockEstimates.addInflightDataBlock was never called, the
1340 1 : // inflightSize is set to 0.
1341 1 : w.coordination.sizeEstimate.dataBlockCompressed(len(w.dataBlockBuf.compressed), 0)
1342 1 :
1343 1 : // Determine if the index block should be flushed. Since we're accessing the
1344 1 : // dataBlockBuf.dataBlock.curKey here, we have to make sure that once we start
1345 1 : // to pool the dataBlockBufs, the curKey isn't used by the Writer once the
1346 1 : // dataBlockBuf is added back to a sync.Pool. In this particular case, the
1347 1 : // byte slice which supports "sep" will eventually be copied when "sep" is
1348 1 : // added to the index block.
1349 1 : prevKey := w.dataBlockBuf.dataBlock.CurKey()
1350 1 : sep := w.indexEntrySep(prevKey, key, w.dataBlockBuf)
1351 1 : // We determine that we should flush an index block from the Writer client
1352 1 : // goroutine, but we actually finish the index block from the writeQueue.
1353 1 : // When we determine that an index block should be flushed, we need to call
1354 1 : // BlockPropertyCollector.FinishIndexBlock. But block property collector
1355 1 : // calls must happen sequentially from the Writer client. Therefore, we need
1356 1 : // to determine that we are going to flush the index block from the Writer
1357 1 : // client.
1358 1 : shouldFlushIndexBlock := supportsTwoLevelIndex(w.tableFormat) && w.indexBlock.shouldFlush(
1359 1 : sep, encodedBHPEstimatedSize, w.indexBlockOptions, w.allocatorSizeClasses,
1360 1 : )
1361 1 :
1362 1 : var indexProps []byte
1363 1 : var flushableIndexBlock *indexBlockBuf
1364 1 : if shouldFlushIndexBlock {
1365 1 : flushableIndexBlock = w.indexBlock
1366 1 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1367 1 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1368 1 : // flush the index block.
1369 1 : indexProps, err = w.finishIndexBlockProps()
1370 1 : if err != nil {
1371 1 : return err
1372 1 : }
1373 : }
1374 :
1375 : // We've called BlockPropertyCollector.FinishDataBlock, and, if necessary,
1376 : // BlockPropertyCollector.FinishIndexBlock. Since we've decided to finish
1377 : // the data block, we can call
1378 : // BlockPropertyCollector.AddPrevDataBlockToIndexBlock.
1379 1 : w.addPrevDataBlockToIndexBlockProps()
1380 1 :
1381 1 : // Schedule a write.
1382 1 : writeTask := writeTaskPool.Get().(*writeTask)
1383 1 : // We're setting compressionDone to indicate that compression of this block
1384 1 : // has already been completed.
1385 1 : writeTask.compressionDone <- true
1386 1 : writeTask.buf = w.dataBlockBuf
1387 1 : writeTask.indexEntrySep = sep
1388 1 : writeTask.currIndexBlock = w.indexBlock
1389 1 : writeTask.indexInflightSize = sep.Size() + encodedBHPEstimatedSize
1390 1 : writeTask.finishedIndexProps = indexProps
1391 1 : writeTask.flushableIndexBlock = flushableIndexBlock
1392 1 :
1393 1 : // The writeTask corresponds to an unwritten index entry.
1394 1 : w.indexBlock.addInflight(writeTask.indexInflightSize)
1395 1 :
1396 1 : w.dataBlockBuf = nil
1397 1 : if w.coordination.parallelismEnabled {
1398 1 : w.coordination.writeQueue.add(writeTask)
1399 1 : } else {
1400 1 : err = w.coordination.writeQueue.addSync(writeTask)
1401 1 : }
1402 1 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
1403 1 :
1404 1 : return err
1405 : }
1406 :
1407 1 : func (w *Writer) maybeFlush(key InternalKey, valueLen int) error {
1408 1 : if !w.dataBlockBuf.shouldFlush(key, valueLen, w.dataBlockOptions, w.allocatorSizeClasses) {
1409 1 : return nil
1410 1 : }
1411 :
1412 1 : err := w.flush(key)
1413 1 :
1414 1 : if err != nil {
1415 1 : w.err = err
1416 1 : return err
1417 1 : }
1418 :
1419 1 : return nil
1420 : }
1421 :
1422 : // dataBlockBuf.dataBlockProps set by this method must be encoded before any future use of the
1423 : // dataBlockBuf.blockPropsEncoder, since the properties slice will get reused by the
1424 : // blockPropsEncoder.
1425 1 : func (w *Writer) finishDataBlockProps(buf *dataBlockBuf) error {
1426 1 : if len(w.blockPropCollectors) == 0 {
1427 1 : return nil
1428 1 : }
1429 1 : var err error
1430 1 : buf.blockPropsEncoder.resetProps()
1431 1 : for i := range w.blockPropCollectors {
1432 1 : scratch := buf.blockPropsEncoder.getScratchForProp()
1433 1 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
1434 1 : return err
1435 1 : }
1436 1 : buf.blockPropsEncoder.addProp(shortID(i), scratch)
1437 : }
1438 :
1439 1 : buf.dataBlockProps = buf.blockPropsEncoder.unsafeProps()
1440 1 : return nil
1441 : }
1442 :
1443 : // The BlockHandleWithProperties returned by this method must be encoded before any future use of
1444 : // the Writer.blockPropsEncoder, since the properties slice will get reused by the blockPropsEncoder.
1445 : // maybeAddBlockPropertiesToBlockHandle should only be called if block is being written synchronously
1446 : // with the Writer client.
1447 : func (w *Writer) maybeAddBlockPropertiesToBlockHandle(
1448 : bh block.Handle,
1449 1 : ) (BlockHandleWithProperties, error) {
1450 1 : err := w.finishDataBlockProps(w.dataBlockBuf)
1451 1 : if err != nil {
1452 0 : return BlockHandleWithProperties{}, err
1453 0 : }
1454 1 : return BlockHandleWithProperties{Handle: bh, Props: w.dataBlockBuf.dataBlockProps}, nil
1455 : }
1456 :
1457 1 : func (w *Writer) indexEntrySep(prevKey, key InternalKey, dataBlockBuf *dataBlockBuf) InternalKey {
1458 1 : // Make a rough guess that we want key-sized scratch to compute the separator.
1459 1 : if cap(dataBlockBuf.sepScratch) < key.Size() {
1460 1 : dataBlockBuf.sepScratch = make([]byte, 0, key.Size()*2)
1461 1 : }
1462 :
1463 1 : var sep InternalKey
1464 1 : if key.UserKey == nil && key.Trailer == 0 {
1465 1 : sep = prevKey.Successor(w.compare, w.successor, dataBlockBuf.sepScratch[:0])
1466 1 : } else {
1467 1 : sep = prevKey.Separator(w.compare, w.separator, dataBlockBuf.sepScratch[:0], key)
1468 1 : }
1469 1 : return sep
1470 : }
1471 :
1472 : // addIndexEntry adds an index entry for the specified key and block handle.
1473 : // addIndexEntry can be called from both the Writer client goroutine, and the
1474 : // writeQueue goroutine. If the flushIndexBuf != nil, then the indexProps, as
1475 : // they're used when the index block is finished.
1476 : //
1477 : // Invariant:
1478 : // 1. addIndexEntry must not store references to the sep InternalKey, the tmp
1479 : // byte slice, bhp.Props. That is, these must be either deep copied or
1480 : // encoded.
1481 : // 2. addIndexEntry must not hold references to the flushIndexBuf, and the writeTo
1482 : // indexBlockBufs.
1483 : func (w *Writer) addIndexEntry(
1484 : sep InternalKey,
1485 : bhp BlockHandleWithProperties,
1486 : tmp []byte,
1487 : flushIndexBuf *indexBlockBuf,
1488 : writeTo *indexBlockBuf,
1489 : inflightSize int,
1490 : indexProps []byte,
1491 1 : ) error {
1492 1 : if bhp.Length == 0 {
1493 0 : // A valid blockHandle must be non-zero.
1494 0 : // In particular, it must have a non-zero length.
1495 0 : return nil
1496 0 : }
1497 :
1498 1 : encoded := encodeBlockHandleWithProperties(tmp, bhp)
1499 1 :
1500 1 : if flushIndexBuf != nil {
1501 1 : if cap(w.indexPartitions) == 0 {
1502 1 : w.indexPartitions = make([]indexBlockAndBlockProperties, 0, 32)
1503 1 : }
1504 : // Enable two level indexes if there is more than one index block.
1505 1 : w.twoLevelIndex = true
1506 1 : if err := w.finishIndexBlock(flushIndexBuf, indexProps); err != nil {
1507 0 : return err
1508 0 : }
1509 : }
1510 :
1511 1 : writeTo.add(sep, encoded, inflightSize)
1512 1 : return nil
1513 : }
1514 :
1515 1 : func (w *Writer) addPrevDataBlockToIndexBlockProps() {
1516 1 : for i := range w.blockPropCollectors {
1517 1 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
1518 1 : }
1519 : }
1520 :
1521 : // addIndexEntrySync adds an index entry for the specified key and block handle.
1522 : // Writer.addIndexEntry is only called synchronously once Writer.Close is called.
1523 : // addIndexEntrySync should only be called if we're sure that index entries
1524 : // aren't being written asynchronously.
1525 : //
1526 : // Invariant:
1527 : // 1. addIndexEntrySync must not store references to the prevKey, key InternalKey's,
1528 : // the tmp byte slice. That is, these must be either deep copied or encoded.
1529 : //
1530 : // TODO: Improve coverage of this method. e.g. tests passed without the line
1531 : // `w.twoLevelIndex = true` previously.
1532 : func (w *Writer) addIndexEntrySync(
1533 : prevKey, key InternalKey, bhp BlockHandleWithProperties, tmp []byte,
1534 1 : ) error {
1535 1 : return w.addIndexEntrySep(w.indexEntrySep(prevKey, key, w.dataBlockBuf), bhp, tmp)
1536 1 : }
1537 :
1538 : func (w *Writer) addIndexEntrySep(
1539 : sep InternalKey, bhp BlockHandleWithProperties, tmp []byte,
1540 1 : ) error {
1541 1 : shouldFlush := supportsTwoLevelIndex(
1542 1 : w.tableFormat) && w.indexBlock.shouldFlush(
1543 1 : sep, encodedBHPEstimatedSize, w.indexBlockOptions, w.allocatorSizeClasses,
1544 1 : )
1545 1 : var flushableIndexBlock *indexBlockBuf
1546 1 : var props []byte
1547 1 : var err error
1548 1 : if shouldFlush {
1549 1 : flushableIndexBlock = w.indexBlock
1550 1 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1551 1 : w.twoLevelIndex = true
1552 1 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1553 1 : // flush the index block.
1554 1 : props, err = w.finishIndexBlockProps()
1555 1 : if err != nil {
1556 0 : return err
1557 0 : }
1558 : }
1559 :
1560 1 : err = w.addIndexEntry(sep, bhp, tmp, flushableIndexBlock, w.indexBlock, 0, props)
1561 1 : if flushableIndexBlock != nil {
1562 1 : flushableIndexBlock.clear()
1563 1 : indexBlockBufPool.Put(flushableIndexBlock)
1564 1 : }
1565 1 : w.addPrevDataBlockToIndexBlockProps()
1566 1 : return err
1567 : }
1568 :
1569 : func shouldFlushWithHints(
1570 : keyLen, valueLen int,
1571 : restartInterval, estimatedBlockSize, numEntries int,
1572 : flushOptions flushDecisionOptions,
1573 : sizeClassHints []int,
1574 1 : ) bool {
1575 1 : if numEntries == 0 {
1576 1 : return false
1577 1 : }
1578 :
1579 : // If we are not informed about the memory allocator's size classes we fall
1580 : // back to a simple set of flush heuristics that are unaware of internal
1581 : // fragmentation in block cache allocations.
1582 1 : if len(sizeClassHints) == 0 {
1583 1 : return shouldFlushWithoutHints(
1584 1 : keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, flushOptions)
1585 1 : }
1586 :
1587 : // For size-class aware flushing we need to account for the metadata that is
1588 : // allocated when this block is loaded into the block cache. For instance, if
1589 : // a block has size 1020B it may fit within a 1024B class. However, when
1590 : // loaded into the block cache we also allocate space for the cache entry
1591 : // metadata. The new allocation of size ~1052B may now only fit within a
1592 : // 2048B class, which increases internal fragmentation.
1593 1 : blockSizeWithMetadata := estimatedBlockSize + cache.ValueMetadataSize
1594 1 :
1595 1 : // For the fast path we can avoid computing the exact varint encoded
1596 1 : // key-value pair size. Instead, we combine the key-value pair size with an
1597 1 : // upper-bound estimate of the associated metadata (4B restart point, 4B
1598 1 : // shared prefix length, 5B varint unshared key size, 5B varint value size).
1599 1 : newEstimatedSize := blockSizeWithMetadata + keyLen + valueLen + 18
1600 1 : // Our new block size estimate disregards key prefix compression. This puts
1601 1 : // us at risk of overestimating the size and flushing small blocks. We
1602 1 : // mitigate this by imposing a minimum size restriction.
1603 1 : if blockSizeWithMetadata <= flushOptions.sizeClassAwareThreshold || newEstimatedSize <= flushOptions.blockSize {
1604 1 : return false
1605 1 : }
1606 :
1607 1 : sizeClass, ok := blockSizeClass(blockSizeWithMetadata, sizeClassHints)
1608 1 : // If the block size could not be mapped to a size class we fall back to
1609 1 : // using a simpler set of flush heuristics.
1610 1 : if !ok {
1611 1 : return shouldFlushWithoutHints(
1612 1 : keyLen, valueLen, restartInterval, estimatedBlockSize, numEntries, flushOptions)
1613 1 : }
1614 :
1615 : // Tighter upper-bound estimate of the metadata stored with the next
1616 : // key-value pair.
1617 1 : newSize := blockSizeWithMetadata + keyLen + valueLen
1618 1 : if numEntries%restartInterval == 0 {
1619 0 : newSize += 4
1620 0 : }
1621 1 : newSize += 4 // varint for shared prefix length
1622 1 : newSize += uvarintLen(uint32(keyLen)) // varint for unshared key bytes
1623 1 : newSize += uvarintLen(uint32(valueLen)) // varint for value size
1624 1 :
1625 1 : if blockSizeWithMetadata < flushOptions.blockSize {
1626 1 : newSizeClass, ok := blockSizeClass(newSize, sizeClassHints)
1627 1 : if ok && newSizeClass-newSize >= sizeClass-blockSizeWithMetadata {
1628 1 : // Although the block hasn't reached the target size, waiting to insert the
1629 1 : // next entry would exceed the target and increase memory fragmentation.
1630 1 : return true
1631 1 : }
1632 1 : return false
1633 : }
1634 :
1635 : // Flush if inserting the next entry bumps the block size to the memory
1636 : // allocator's next size class.
1637 1 : return newSize > sizeClass
1638 : }
1639 :
1640 : func shouldFlushWithoutHints(
1641 : keyLen, valueLen int,
1642 : restartInterval, estimatedBlockSize, numEntries int,
1643 : flushOptions flushDecisionOptions,
1644 1 : ) bool {
1645 1 : if estimatedBlockSize >= flushOptions.blockSize {
1646 1 : return true
1647 1 : }
1648 :
1649 : // The block is currently smaller than the target size.
1650 1 : if estimatedBlockSize <= flushOptions.blockSizeThreshold {
1651 1 : // The block is smaller than the threshold size at which we'll consider
1652 1 : // flushing it.
1653 1 : return false
1654 1 : }
1655 :
1656 1 : newSize := estimatedBlockSize + keyLen + valueLen
1657 1 : if numEntries%restartInterval == 0 {
1658 1 : newSize += 4
1659 1 : }
1660 1 : newSize += 4 // varint for shared prefix length
1661 1 : newSize += uvarintLen(uint32(keyLen)) // varint for unshared key bytes
1662 1 : newSize += uvarintLen(uint32(valueLen)) // varint for value size
1663 1 : // Flush if the block plus the new entry is larger than the target size.
1664 1 : return newSize > flushOptions.blockSize
1665 : }
1666 :
1667 : // blockSizeClass returns the smallest memory allocator size class that could
1668 : // hold a block of a given size and returns a boolean indicating whether an
1669 : // appropriate size class was found. It is useful for computing the potential
1670 : // space wasted by an allocation.
1671 1 : func blockSizeClass(blockSize int, sizeClassHints []int) (int, bool) {
1672 1 : sizeClassIdx, _ := slices.BinarySearch(sizeClassHints, blockSize)
1673 1 : if sizeClassIdx == len(sizeClassHints) {
1674 1 : return -1, false
1675 1 : }
1676 1 : return sizeClassHints[sizeClassIdx], true
1677 : }
1678 :
1679 1 : func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) {
1680 1 : if len(k.UserKey) == 0 {
1681 0 : return a, k
1682 0 : }
1683 1 : a, keyCopy := a.Copy(k.UserKey)
1684 1 : return a, InternalKey{UserKey: keyCopy, Trailer: k.Trailer}
1685 : }
1686 :
1687 : // Invariants: The byte slice returned by finishIndexBlockProps is heap-allocated
1688 : //
1689 : // and has its own lifetime, independent of the Writer and the blockPropsEncoder,
1690 : //
1691 : // and it is safe to:
1692 : // 1. Reuse w.blockPropsEncoder without first encoding the byte slice returned.
1693 : // 2. Store the byte slice in the Writer since it is a copy and not supported by
1694 : // an underlying buffer.
1695 1 : func (w *Writer) finishIndexBlockProps() ([]byte, error) {
1696 1 : w.blockPropsEncoder.resetProps()
1697 1 : for i := range w.blockPropCollectors {
1698 1 : scratch := w.blockPropsEncoder.getScratchForProp()
1699 1 : var err error
1700 1 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
1701 1 : return nil, err
1702 1 : }
1703 1 : w.blockPropsEncoder.addProp(shortID(i), scratch)
1704 : }
1705 1 : return w.blockPropsEncoder.props(), nil
1706 : }
1707 :
1708 : // finishIndexBlock finishes the current index block and adds it to the top
1709 : // level index block. This is only used when two level indexes are enabled.
1710 : //
1711 : // Invariants:
1712 : // 1. The props slice passed into finishedIndexBlock must not be a
1713 : // owned by any other struct, since it will be stored in the Writer.indexPartitions
1714 : // slice.
1715 : // 2. None of the buffers owned by indexBuf will be shallow copied and stored elsewhere.
1716 : // That is, it must be safe to reuse indexBuf after finishIndexBlock has been called.
1717 1 : func (w *Writer) finishIndexBlock(indexBuf *indexBlockBuf, props []byte) error {
1718 1 : part := indexBlockAndBlockProperties{
1719 1 : nEntries: indexBuf.block.EntryCount(), properties: props,
1720 1 : }
1721 1 : w.indexSepAlloc, part.sep = cloneKeyWithBuf(
1722 1 : indexBuf.block.CurKey(), w.indexSepAlloc,
1723 1 : )
1724 1 : bk := indexBuf.finish()
1725 1 : if len(w.indexBlockAlloc) < len(bk) {
1726 1 : // Allocate enough bytes for approximately 16 index blocks.
1727 1 : w.indexBlockAlloc = make([]byte, len(bk)*16)
1728 1 : }
1729 1 : n := copy(w.indexBlockAlloc, bk)
1730 1 : part.block = w.indexBlockAlloc[:n:n]
1731 1 : w.indexBlockAlloc = w.indexBlockAlloc[n:]
1732 1 : w.indexPartitions = append(w.indexPartitions, part)
1733 1 : return nil
1734 : }
1735 :
1736 1 : func (w *Writer) writeTwoLevelIndex() (block.Handle, error) {
1737 1 : props, err := w.finishIndexBlockProps()
1738 1 : if err != nil {
1739 0 : return block.Handle{}, err
1740 0 : }
1741 : // Add the final unfinished index.
1742 1 : if err = w.finishIndexBlock(w.indexBlock, props); err != nil {
1743 0 : return block.Handle{}, err
1744 0 : }
1745 :
1746 1 : for i := range w.indexPartitions {
1747 1 : b := &w.indexPartitions[i]
1748 1 : w.props.NumDataBlocks += uint64(b.nEntries)
1749 1 :
1750 1 : data := b.block
1751 1 : w.props.IndexSize += uint64(len(data))
1752 1 : bh, err := w.layout.WriteIndexBlock(data)
1753 1 : if err != nil {
1754 0 : return block.Handle{}, err
1755 0 : }
1756 1 : bhp := BlockHandleWithProperties{
1757 1 : Handle: bh,
1758 1 : Props: b.properties,
1759 1 : }
1760 1 : encoded := encodeBlockHandleWithProperties(w.blockBuf.tmp[:], bhp)
1761 1 : w.topLevelIndexBlock.Add(b.sep, encoded)
1762 : }
1763 :
1764 : // NB: RocksDB includes the block trailer length in the index size
1765 : // property, though it doesn't include the trailer in the top level
1766 : // index size property.
1767 1 : w.props.IndexPartitions = uint64(len(w.indexPartitions))
1768 1 : w.props.TopLevelIndexSize = uint64(w.topLevelIndexBlock.EstimatedSize())
1769 1 : w.props.IndexSize += w.props.TopLevelIndexSize + block.TrailerLen
1770 1 : return w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish())
1771 : }
1772 :
1773 : func compressAndChecksum(
1774 : b []byte, compression Compression, blockBuf *blockBuf,
1775 1 : ) (compressed []byte, trailer block.Trailer) {
1776 1 : // Compress the buffer, discarding the result if the improvement isn't at
1777 1 : // least 12.5%.
1778 1 : blockType, compressed := compressBlock(compression, b, blockBuf.compressedBuf)
1779 1 : if blockType != noCompressionBlockType && cap(compressed) > cap(blockBuf.compressedBuf) {
1780 1 : blockBuf.compressedBuf = compressed[:cap(compressed)]
1781 1 : }
1782 1 : if len(compressed) < len(b)-len(b)/8 {
1783 1 : b = compressed
1784 1 : } else {
1785 1 : blockType = noCompressionBlockType
1786 1 : }
1787 :
1788 : // Calculate the checksum.
1789 1 : trailer[0] = byte(blockType)
1790 1 : checksum := blockBuf.checksummer.Checksum(b, trailer[:1])
1791 1 : return b, block.MakeTrailer(byte(blockType), checksum)
1792 : }
1793 :
1794 : // assertFormatCompatibility ensures that the features present on the table are
1795 : // compatible with the table format version.
1796 1 : func (w *Writer) assertFormatCompatibility() error {
1797 1 : // PebbleDBv1: block properties.
1798 1 : if len(w.blockPropCollectors) > 0 && w.tableFormat < TableFormatPebblev1 {
1799 1 : return errors.Newf(
1800 1 : "table format version %s is less than the minimum required version %s for block properties",
1801 1 : w.tableFormat, TableFormatPebblev1,
1802 1 : )
1803 1 : }
1804 :
1805 : // PebbleDBv2: range keys.
1806 1 : if w.props.NumRangeKeys() > 0 && w.tableFormat < TableFormatPebblev2 {
1807 1 : return errors.Newf(
1808 1 : "table format version %s is less than the minimum required version %s for range keys",
1809 1 : w.tableFormat, TableFormatPebblev2,
1810 1 : )
1811 1 : }
1812 :
1813 : // PebbleDBv3: value blocks.
1814 1 : if (w.props.NumValueBlocks > 0 || w.props.NumValuesInValueBlocks > 0 ||
1815 1 : w.props.ValueBlocksSize > 0) && w.tableFormat < TableFormatPebblev3 {
1816 0 : return errors.Newf(
1817 0 : "table format version %s is less than the minimum required version %s for value blocks",
1818 0 : w.tableFormat, TableFormatPebblev3)
1819 0 : }
1820 :
1821 : // PebbleDBv4: DELSIZED tombstones.
1822 1 : if w.props.NumSizedDeletions > 0 && w.tableFormat < TableFormatPebblev4 {
1823 0 : return errors.Newf(
1824 0 : "table format version %s is less than the minimum required version %s for sized deletion tombstones",
1825 0 : w.tableFormat, TableFormatPebblev4)
1826 0 : }
1827 1 : return nil
1828 : }
1829 :
1830 : // UnsafeLastPointUserKey returns the last point key written to the writer to
1831 : // which this option was passed during creation. The returned key points
1832 : // directly into a buffer belonging to the Writer. The value's lifetime ends the
1833 : // next time a point key is added to the Writer.
1834 : //
1835 : // Must not be called after Writer is closed.
1836 1 : func (w *Writer) UnsafeLastPointUserKey() []byte {
1837 1 : if w != nil && w.dataBlockBuf.dataBlock.EntryCount() >= 1 {
1838 1 : // w.dataBlockBuf.dataBlock.curKey is guaranteed to point to the last point key
1839 1 : // which was added to the Writer.
1840 1 : return w.dataBlockBuf.dataBlock.CurUserKey()
1841 1 : }
1842 1 : return nil
1843 : }
1844 :
1845 : // EncodeSpan encodes the keys in the given span. The span can contain either
1846 : // only RANGEDEL keys or only range keys.
1847 1 : func (w *Writer) EncodeSpan(span *keyspan.Span) error {
1848 1 : if span.Empty() {
1849 1 : return nil
1850 1 : }
1851 1 : if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
1852 1 : return rangedel.Encode(span, w.Add)
1853 1 : }
1854 1 : return rangekey.Encode(span, w.AddRangeKey)
1855 : }
1856 :
1857 : // Close finishes writing the table and closes the underlying file that the
1858 : // table was written to.
1859 1 : func (w *Writer) Close() (err error) {
1860 1 : defer func() {
1861 1 : if w.valueBlockWriter != nil {
1862 1 : releaseValueBlockWriter(w.valueBlockWriter)
1863 1 : // Defensive code in case Close gets called again. We don't want to put
1864 1 : // the same object to a sync.Pool.
1865 1 : w.valueBlockWriter = nil
1866 1 : }
1867 1 : w.layout.Abort()
1868 1 : // Record any error in the writer (so we can exit early if Close is called
1869 1 : // again).
1870 1 : if err != nil {
1871 1 : w.err = err
1872 1 : }
1873 : }()
1874 :
1875 : // finish must be called before we check for an error, because finish will
1876 : // block until every single task added to the writeQueue has been processed,
1877 : // and an error could be encountered while any of those tasks are processed.
1878 1 : if err := w.coordination.writeQueue.finish(); err != nil {
1879 1 : return err
1880 1 : }
1881 :
1882 1 : if w.err != nil {
1883 1 : return w.err
1884 1 : }
1885 :
1886 : // The w.meta.LargestPointKey is only used once the Writer is closed, so it is safe to set it
1887 : // when the Writer is closed.
1888 : //
1889 : // The following invariants ensure that setting the largest key at this point of a Writer close
1890 : // is correct:
1891 : // 1. Keys must only be added to the Writer in an increasing order.
1892 : // 2. The current w.dataBlockBuf is guaranteed to have the latest key added to the Writer. This
1893 : // must be true, because a w.dataBlockBuf is only switched out when a dataBlock is flushed,
1894 : // however, if a dataBlock is flushed, then we add a key to the new w.dataBlockBuf in the
1895 : // addPoint function after the flush occurs.
1896 1 : if w.dataBlockBuf.dataBlock.EntryCount() >= 1 {
1897 1 : w.meta.SetLargestPointKey(w.dataBlockBuf.dataBlock.CurKey().Clone())
1898 1 : }
1899 :
1900 : // Finish the last data block, or force an empty data block if there
1901 : // aren't any data blocks at all.
1902 1 : if w.dataBlockBuf.dataBlock.EntryCount() > 0 || w.indexBlock.block.EntryCount() == 0 {
1903 1 : bh, err := w.layout.WriteDataBlock(w.dataBlockBuf.dataBlock.Finish(), &w.dataBlockBuf.blockBuf)
1904 1 : if err != nil {
1905 0 : return err
1906 0 : }
1907 1 : bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
1908 1 : if err != nil {
1909 0 : return err
1910 0 : }
1911 1 : prevKey := w.dataBlockBuf.dataBlock.CurKey()
1912 1 : if err := w.addIndexEntrySync(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil {
1913 0 : return err
1914 0 : }
1915 : }
1916 1 : w.props.DataSize = w.layout.offset
1917 1 :
1918 1 : // Write the filter block.
1919 1 : if w.filter != nil {
1920 1 : bh, err := w.layout.WriteFilterBlock(w.filter)
1921 1 : if err != nil {
1922 0 : return err
1923 0 : }
1924 1 : w.props.FilterPolicyName = w.filter.policyName()
1925 1 : w.props.FilterSize = bh.Length
1926 : }
1927 :
1928 1 : if w.twoLevelIndex {
1929 1 : w.props.IndexType = twoLevelIndex
1930 1 : // Write the two level index block.
1931 1 : if _, err = w.writeTwoLevelIndex(); err != nil {
1932 0 : return err
1933 0 : }
1934 1 : } else {
1935 1 : w.props.IndexType = binarySearchIndex
1936 1 : // NB: RocksDB includes the block trailer length in the index size
1937 1 : // property, though it doesn't include the trailer in the filter size
1938 1 : // property.
1939 1 : w.props.IndexSize = uint64(w.indexBlock.estimatedSize()) + block.TrailerLen
1940 1 : w.props.NumDataBlocks = uint64(w.indexBlock.block.EntryCount())
1941 1 : // Write the single level index block.
1942 1 : if _, err = w.layout.WriteIndexBlock(w.indexBlock.finish()); err != nil {
1943 0 : return err
1944 0 : }
1945 : }
1946 :
1947 : // Write the range-del block.
1948 1 : if w.props.NumRangeDeletions > 0 {
1949 1 : // Because the range tombstones are fragmented, the end key of the last
1950 1 : // added range tombstone will be the largest range tombstone key. Note
1951 1 : // that we need to make this into a range deletion sentinel because
1952 1 : // sstable boundaries are inclusive while the end key of a range
1953 1 : // deletion tombstone is exclusive. A Clone() is necessary as
1954 1 : // rangeDelBlock.curValue is the same slice that will get passed into
1955 1 : // w.writer, and some implementations of vfs.File mutate the slice
1956 1 : // passed into Write(). Also, w.meta will often outlive the blockWriter,
1957 1 : // and so cloning curValue allows the rangeDelBlock's internal buffer to
1958 1 : // get gc'd.
1959 1 : k := base.MakeRangeDeleteSentinelKey(w.rangeDelBlock.CurValue()).Clone()
1960 1 : w.meta.SetLargestRangeDelKey(k)
1961 1 : if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
1962 0 : return err
1963 0 : }
1964 : }
1965 :
1966 : // Write the range-key block, flushing any remaining spans from the
1967 : // fragmenter first.
1968 1 : w.fragmenter.Finish()
1969 1 :
1970 1 : if w.props.NumRangeKeys() > 0 {
1971 1 : key := w.rangeKeyBlock.CurKey()
1972 1 : kind := key.Kind()
1973 1 : endKey, _, err := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.CurValue())
1974 1 : if err != nil {
1975 0 : return err
1976 0 : }
1977 1 : k := base.MakeExclusiveSentinelKey(kind, endKey).Clone()
1978 1 : w.meta.SetLargestRangeKey(k)
1979 1 : if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
1980 0 : return err
1981 0 : }
1982 : }
1983 :
1984 1 : if w.valueBlockWriter != nil {
1985 1 : _, vbStats, err := w.valueBlockWriter.finish(&w.layout, w.layout.offset)
1986 1 : if err != nil {
1987 0 : return err
1988 0 : }
1989 1 : w.props.NumValueBlocks = vbStats.numValueBlocks
1990 1 : w.props.NumValuesInValueBlocks = vbStats.numValuesInValueBlocks
1991 1 : w.props.ValueBlocksSize = vbStats.valueBlocksAndIndexSize
1992 : }
1993 :
1994 1 : {
1995 1 : // Finish and record the prop collectors if props are not yet recorded.
1996 1 : // Pre-computed props might have been copied by specialized sst creators
1997 1 : // like suffix replacer.
1998 1 : if len(w.props.UserProperties) == 0 {
1999 1 : userProps := make(map[string]string)
2000 1 : for i := range w.blockPropCollectors {
2001 1 : scratch := w.blockPropsEncoder.getScratchForProp()
2002 1 : // Place the shortID in the first byte.
2003 1 : scratch = append(scratch, byte(i))
2004 1 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
2005 1 : if err != nil {
2006 1 : return err
2007 1 : }
2008 1 : var prop string
2009 1 : if len(buf) > 0 {
2010 1 : prop = string(buf)
2011 1 : }
2012 : // NB: The property is populated in the map even if it is the
2013 : // empty string, since the presence in the map is what indicates
2014 : // that the block property collector was used when writing.
2015 1 : userProps[w.blockPropCollectors[i].Name()] = prop
2016 : }
2017 1 : if len(userProps) > 0 {
2018 1 : w.props.UserProperties = userProps
2019 1 : }
2020 : }
2021 :
2022 : // Write the properties block.
2023 1 : var raw rowblk.Writer
2024 1 : // The restart interval is set to infinity because the properties block
2025 1 : // is always read sequentially and cached in a heap located object. This
2026 1 : // reduces table size without a significant impact on performance.
2027 1 : raw.RestartInterval = propertiesBlockRestartInterval
2028 1 : w.props.CompressionOptions = rocksDBCompressionOptions
2029 1 : w.props.save(w.tableFormat, &raw)
2030 1 : w.layout.WritePropertiesBlock(raw.Finish())
2031 : }
2032 :
2033 : // Write the table footer.
2034 1 : w.meta.Size, err = w.layout.Finish()
2035 1 : if err != nil {
2036 1 : return err
2037 1 : }
2038 1 : w.meta.Properties = w.props
2039 1 :
2040 1 : // Check that the features present in the table are compatible with the format
2041 1 : // configured for the table.
2042 1 : if err = w.assertFormatCompatibility(); err != nil {
2043 1 : return err
2044 1 : }
2045 :
2046 1 : w.dataBlockBuf.clear()
2047 1 : dataBlockBufPool.Put(w.dataBlockBuf)
2048 1 : w.dataBlockBuf = nil
2049 1 : w.indexBlock.clear()
2050 1 : indexBlockBufPool.Put(w.indexBlock)
2051 1 : w.indexBlock = nil
2052 1 :
2053 1 : // Make any future calls to Set or Close return an error.
2054 1 : w.err = errWriterClosed
2055 1 : return nil
2056 : }
2057 :
2058 : // EstimatedSize returns the estimated size of the sstable being written if a
2059 : // call to Finish() was made without adding additional keys.
2060 1 : func (w *Writer) EstimatedSize() uint64 {
2061 1 : if w == nil {
2062 0 : return 0
2063 0 : }
2064 1 : return w.coordination.sizeEstimate.size() +
2065 1 : uint64(w.dataBlockBuf.dataBlock.EstimatedSize()) +
2066 1 : w.indexBlock.estimatedSize()
2067 : }
2068 :
2069 : // Metadata returns the metadata for the finished sstable. Only valid to call
2070 : // after the sstable has been finished.
2071 1 : func (w *Writer) Metadata() (*WriterMetadata, error) {
2072 1 : if !w.layout.IsFinished() {
2073 0 : return nil, errors.New("pebble: writer is not closed")
2074 0 : }
2075 1 : return &w.meta, nil
2076 : }
2077 :
2078 : // WriterOption provide an interface to do work on Writer while it is being
2079 : // opened.
2080 : type WriterOption interface {
2081 : // writerApply is called on the writer during opening in order to set
2082 : // internal parameters.
2083 : writerApply(*Writer)
2084 : }
2085 :
2086 : // NewWriter returns a new table writer for the file. Closing the writer will
2087 : // close the file.
2088 1 : func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...WriterOption) *Writer {
2089 1 : o = o.ensureDefaults()
2090 1 : w := &Writer{
2091 1 : layout: makeLayoutWriter(writable, o),
2092 1 : meta: WriterMetadata{
2093 1 : SmallestSeqNum: math.MaxUint64,
2094 1 : },
2095 1 : dataBlockOptions: flushDecisionOptions{
2096 1 : blockSize: o.BlockSize,
2097 1 : blockSizeThreshold: (o.BlockSize*o.BlockSizeThreshold + 99) / 100,
2098 1 : sizeClassAwareThreshold: (o.BlockSize*o.SizeClassAwareThreshold + 99) / 100,
2099 1 : },
2100 1 : indexBlockOptions: flushDecisionOptions{
2101 1 : blockSize: o.IndexBlockSize,
2102 1 : blockSizeThreshold: (o.IndexBlockSize*o.BlockSizeThreshold + 99) / 100,
2103 1 : sizeClassAwareThreshold: (o.IndexBlockSize*o.SizeClassAwareThreshold + 99) / 100,
2104 1 : },
2105 1 : compare: o.Comparer.Compare,
2106 1 : split: o.Comparer.Split,
2107 1 : formatKey: o.Comparer.FormatKey,
2108 1 : compression: o.Compression,
2109 1 : separator: o.Comparer.Separator,
2110 1 : successor: o.Comparer.Successor,
2111 1 : tableFormat: o.TableFormat,
2112 1 : isStrictObsolete: o.IsStrictObsolete,
2113 1 : writingToLowestLevel: o.WritingToLowestLevel,
2114 1 : restartInterval: o.BlockRestartInterval,
2115 1 : checksumType: o.Checksum,
2116 1 : disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
2117 1 : indexBlock: newIndexBlockBuf(o.Parallelism),
2118 1 : rangeDelBlock: rowblk.Writer{RestartInterval: 1},
2119 1 : rangeKeyBlock: rowblk.Writer{RestartInterval: 1},
2120 1 : topLevelIndexBlock: rowblk.Writer{RestartInterval: 1},
2121 1 : fragmenter: keyspan.Fragmenter{
2122 1 : Cmp: o.Comparer.Compare,
2123 1 : Format: o.Comparer.FormatKey,
2124 1 : },
2125 1 : allocatorSizeClasses: o.AllocatorSizeClasses,
2126 1 : }
2127 1 : if w.tableFormat >= TableFormatPebblev3 {
2128 1 : w.shortAttributeExtractor = o.ShortAttributeExtractor
2129 1 : w.requiredInPlaceValueBound = o.RequiredInPlaceValueBound
2130 1 : if !o.DisableValueBlocks {
2131 1 : w.valueBlockWriter = newValueBlockWriter(
2132 1 : w.dataBlockOptions.blockSize, w.dataBlockOptions.blockSizeThreshold, w.compression, w.checksumType, func(compressedSize int) {
2133 1 : w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0)
2134 1 : })
2135 : }
2136 : }
2137 :
2138 1 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
2139 1 :
2140 1 : w.blockBuf = blockBuf{
2141 1 : checksummer: block.Checksummer{Type: o.Checksum},
2142 1 : }
2143 1 :
2144 1 : w.coordination.init(o.Parallelism, w)
2145 1 :
2146 1 : if writable == nil {
2147 0 : w.err = errors.New("pebble: nil writable")
2148 0 : return w
2149 0 : }
2150 :
2151 : // Note that WriterOptions are applied in two places; the ones with a
2152 : // preApply() method are applied here. The rest are applied down below after
2153 : // default properties are set.
2154 1 : type preApply interface{ preApply() }
2155 1 : for _, opt := range extraOpts {
2156 0 : if _, ok := opt.(preApply); ok {
2157 0 : opt.writerApply(w)
2158 0 : }
2159 : }
2160 :
2161 1 : if o.FilterPolicy != nil {
2162 1 : switch o.FilterType {
2163 1 : case TableFilter:
2164 1 : w.filter = newTableFilterWriter(o.FilterPolicy)
2165 0 : default:
2166 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
2167 : }
2168 : }
2169 :
2170 1 : w.props.ComparerName = o.Comparer.Name
2171 1 : w.props.CompressionName = o.Compression.String()
2172 1 : w.props.MergerName = o.MergerName
2173 1 : w.props.PropertyCollectorNames = "[]"
2174 1 :
2175 1 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
2176 1 : if w.tableFormat >= TableFormatPebblev4 {
2177 1 : numBlockPropertyCollectors++
2178 1 : }
2179 :
2180 1 : if numBlockPropertyCollectors > 0 {
2181 1 : if numBlockPropertyCollectors > maxPropertyCollectors {
2182 0 : w.err = errors.New("pebble: too many block property collectors")
2183 0 : return w
2184 0 : }
2185 1 : w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
2186 1 : for _, constructFn := range o.BlockPropertyCollectors {
2187 1 : w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
2188 1 : }
2189 1 : if w.tableFormat >= TableFormatPebblev4 {
2190 1 : w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
2191 1 : }
2192 :
2193 1 : var buf bytes.Buffer
2194 1 : buf.WriteString("[")
2195 1 : for i := range w.blockPropCollectors {
2196 1 : if i > 0 {
2197 1 : buf.WriteString(",")
2198 1 : }
2199 1 : buf.WriteString(w.blockPropCollectors[i].Name())
2200 : }
2201 1 : buf.WriteString("]")
2202 1 : w.props.PropertyCollectorNames = buf.String()
2203 : }
2204 :
2205 : // Apply the remaining WriterOptions that do not have a preApply() method.
2206 1 : for _, opt := range extraOpts {
2207 0 : if _, ok := opt.(preApply); ok {
2208 0 : continue
2209 : }
2210 0 : opt.writerApply(w)
2211 : }
2212 :
2213 : // Initialize the range key fragmenter and encoder.
2214 1 : w.fragmenter.Emit = w.encodeRangeKeySpan
2215 1 : w.rangeKeyEncoder.Emit = w.addRangeKey
2216 1 : return w
2217 : }
2218 :
2219 : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
2220 : // be used internally by Pebble.
2221 : func (w *Writer) SetSnapshotPinnedProperties(
2222 : pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
2223 1 : ) {
2224 1 : w.props.SnapshotPinnedKeys = pinnedKeyCount
2225 1 : w.props.SnapshotPinnedKeySize = pinnedKeySize
2226 1 : w.props.SnapshotPinnedValueSize = pinnedValueSize
2227 1 : }
|