Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "math"
13 : "runtime"
14 : "sync"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/bytealloc"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/internal/keyspan"
21 : "github.com/cockroachdb/pebble/internal/rangedel"
22 : "github.com/cockroachdb/pebble/internal/rangekey"
23 : "github.com/cockroachdb/pebble/objstorage"
24 : "github.com/cockroachdb/pebble/sstable/block"
25 : "github.com/cockroachdb/pebble/sstable/rowblk"
26 : "github.com/cockroachdb/pebble/sstable/valblk"
27 : )
28 :
29 : // encodedBHPEstimatedSize estimates the size of the encoded BlockHandleWithProperties.
30 : // It would also be nice to account for the length of the data block properties here,
31 : // but isn't necessary since this is an estimate.
32 : const encodedBHPEstimatedSize = binary.MaxVarintLen64 * 2
33 :
34 : var errWriterClosed = errors.New("pebble: writer is closed")
35 :
36 : // RawRowWriter is a sstable RawWriter that writes sstables with row-oriented
37 : // blocks. All table formats TableFormatPebblev4 and earlier write row-oriented
38 : // blocks and use RawRowWriter.
39 : type RawRowWriter struct {
40 : layout layoutWriter
41 : meta WriterMetadata
42 : err error
43 : dataFlush block.FlushGovernor
44 : indexFlush block.FlushGovernor
45 : // The following fields are copied from Options.
46 : compare Compare
47 : pointSuffixCmp base.ComparePointSuffixes
48 : split Split
49 : formatKey base.FormatKey
50 : compression block.Compression
51 : separator Separator
52 : successor Successor
53 : tableFormat TableFormat
54 : isStrictObsolete bool
55 : writingToLowestLevel bool
56 : restartInterval int
57 : checksumType block.ChecksumType
58 : // disableKeyOrderChecks disables the checks that keys are added to an
59 : // sstable in order. It is intended for internal use only in the construction
60 : // of invalid sstables for testing. See tool/make_test_sstables.go.
61 : disableKeyOrderChecks bool
62 : // With two level indexes, the index/filter of a SST file is partitioned into
63 : // smaller blocks with an additional top-level index on them. When reading an
64 : // index/filter, only the top-level index is loaded into memory. The two level
65 : // index/filter then uses the top-level index to load on demand into the block
66 : // cache the partitions that are required to perform the index/filter query.
67 : //
68 : // Two level indexes are enabled automatically when there is more than one
69 : // index block.
70 : //
71 : // This is useful when there are very large index blocks, which generally occurs
72 : // with the usage of large keys. With large index blocks, the index blocks fight
73 : // the data blocks for block cache space and the index blocks are likely to be
74 : // re-read many times from the disk. The top level index, which has a much
75 : // smaller memory footprint, can be used to prevent the entire index block from
76 : // being loaded into the block cache.
77 : twoLevelIndex bool
78 : indexBlock *indexBlockBuf
79 : rangeDelBlock rowblk.Writer
80 : rangeKeyBlock rowblk.Writer
81 : topLevelIndexBlock rowblk.Writer
82 : props Properties
83 : blockPropCollectors []BlockPropertyCollector
84 : obsoleteCollector obsoleteKeyBlockPropertyCollector
85 : blockPropsEncoder blockPropertiesEncoder
86 : // filter accumulates the filter block. If populated, the filter ingests
87 : // either the output of w.split (i.e. a prefix extractor) if w.split is not
88 : // nil, or the full keys otherwise.
89 : filter filterWriter
90 : indexPartitions []bufferedIndexBlock
91 :
92 : // indexBlockAlloc is used to bulk-allocate byte slices used to store index
93 : // blocks in indexPartitions. These live until the index finishes.
94 : indexBlockAlloc []byte
95 : // indexSepAlloc is used to bulk-allocate index block separator slices stored
96 : // in indexPartitions. These live until the index finishes.
97 : indexSepAlloc bytealloc.A
98 :
99 : rangeKeyEncoder rangekey.Encoder
100 : // dataBlockBuf consists of the state which is currently owned by and used by
101 : // the Writer client goroutine. This state can be handed off to other goroutines.
102 : dataBlockBuf *dataBlockBuf
103 : // blockBuf consists of the state which is owned by and used by the Writer client
104 : // goroutine.
105 : blockBuf blockBuf
106 :
107 : coordination coordinationState
108 :
109 : // Information (other than the byte slice) about the last point key, to
110 : // avoid extracting it again.
111 : lastPointKeyInfo pointKeyInfo
112 :
113 : // For value blocks.
114 : shortAttributeExtractor base.ShortAttributeExtractor
115 : requiredInPlaceValueBound UserKeyPrefixBound
116 : // When w.tableFormat >= TableFormatPebblev3, valueBlockWriter is nil iff
117 : // WriterOptions.DisableValueBlocks was true.
118 : valueBlockWriter *valblk.Writer
119 :
120 : allocatorSizeClasses []int
121 :
122 : numDeletionsThreshold int
123 : deletionSizeRatioThreshold float32
124 : }
125 :
126 : type pointKeyInfo struct {
127 : trailer base.InternalKeyTrailer
128 : // Only computed when w.valueBlockWriter is not nil.
129 : userKeyLen int
130 : // prefixLen uses w.split, if not nil. Only computed when w.valueBlockWriter
131 : // is not nil.
132 : prefixLen int
133 : // True iff the point was marked obsolete.
134 : isObsolete bool
135 : }
136 :
137 : type coordinationState struct {
138 : parallelismEnabled bool
139 :
140 : // writeQueue is used to write data blocks to disk. The writeQueue is primarily
141 : // used to maintain the order in which data blocks must be written to disk. For
142 : // this reason, every single data block write must be done through the writeQueue.
143 : writeQueue *writeQueue
144 :
145 : sizeEstimate dataBlockEstimates
146 : }
147 :
148 2 : func (c *coordinationState) init(parallelismEnabled bool, writer *RawRowWriter) {
149 2 : c.parallelismEnabled = parallelismEnabled
150 2 : // useMutex is false regardless of parallelismEnabled, because we do not do
151 2 : // parallel compression yet.
152 2 : c.sizeEstimate.useMutex = false
153 2 :
154 2 : // writeQueueSize determines the size of the write queue, or the number
155 2 : // of items which can be added to the queue without blocking. By default, we
156 2 : // use a writeQueue size of 0, since we won't be doing any block writes in
157 2 : // parallel.
158 2 : writeQueueSize := 0
159 2 : if parallelismEnabled {
160 2 : writeQueueSize = runtime.GOMAXPROCS(0)
161 2 : }
162 2 : c.writeQueue = newWriteQueue(writeQueueSize, writer)
163 : }
164 :
165 : // sizeEstimate is a general purpose helper for estimating two kinds of sizes:
166 : // A. The compressed sstable size, which is useful for deciding when to start
167 : //
168 : // a new sstable during flushes or compactions. In practice, we use this in
169 : // estimating the data size (excluding the index).
170 : //
171 : // B. The size of index blocks to decide when to start a new index block.
172 : //
173 : // There are some terminology peculiarities which are due to the origin of
174 : // sizeEstimate for use case A with parallel compression enabled (for which
175 : // the code has not been merged). Specifically this relates to the terms
176 : // "written" and "compressed".
177 : // - The notion of "written" for case A is sufficiently defined by saying that
178 : // the data block is compressed. Waiting for the actual data block write to
179 : // happen can result in unnecessary estimation, when we already know how big
180 : // it will be in compressed form. Additionally, with the forthcoming value
181 : // blocks containing older MVCC values, these compressed block will be held
182 : // in-memory until late in the sstable writing, and we do want to accurately
183 : // account for them without waiting for the actual write.
184 : // For case B, "written" means that the index entry has been fully
185 : // generated, and has been added to the uncompressed block buffer for that
186 : // index block. It does not include actually writing a potentially
187 : // compressed index block.
188 : // - The notion of "compressed" is to differentiate between a "inflight" size
189 : // and the actual size, and is handled via computing a compression ratio
190 : // observed so far (defaults to 1).
191 : // For case A, this is actual data block compression, so the "inflight" size
192 : // is uncompressed blocks (that are no longer being written to) and the
193 : // "compressed" size is after they have been compressed.
194 : // For case B the inflight size is for a key-value pair in the index for
195 : // which the value size (the encoded size of the BlockHandleWithProperties)
196 : // is not accurately known, while the compressed size is the size of that
197 : // entry when it has been added to the (in-progress) index ssblock.
198 : //
199 : // Usage: To update state, one can optionally provide an inflight write value
200 : // using addInflight (used for case B). When something is "written" the state
201 : // can be updated using either writtenWithDelta or writtenWithTotal, which
202 : // provide the actual delta size or the total size (latter must be
203 : // monotonically non-decreasing). If there were no calls to addInflight, there
204 : // isn't any real estimation happening here. So case A does not do any real
205 : // estimation. However, when we introduce parallel compression, there will be
206 : // estimation in that the client goroutine will call addInFlight and the
207 : // compression goroutines will call writtenWithDelta.
208 : type sizeEstimate struct {
209 : // emptySize is the size when there is no inflight data, and numEntries is 0.
210 : // emptySize is constant once set.
211 : emptySize uint64
212 :
213 : // inflightSize is the estimated size of some inflight data which hasn't
214 : // been written yet.
215 : inflightSize uint64
216 :
217 : // totalSize is the total size of the data which has already been written.
218 : totalSize uint64
219 :
220 : // numWrittenEntries is the total number of entries which have already been
221 : // written.
222 : numWrittenEntries uint64
223 : // numInflightEntries is the total number of entries which are inflight, and
224 : // haven't been written.
225 : numInflightEntries uint64
226 :
227 : // maxEstimatedSize stores the maximum result returned from sizeEstimate.size.
228 : // It ensures that values returned from subsequent calls to Writer.EstimatedSize
229 : // never decrease.
230 : maxEstimatedSize uint64
231 :
232 : // We assume that the entries added to the sizeEstimate can be compressed.
233 : // For this reason, we keep track of a compressedSize and an uncompressedSize
234 : // to compute a compression ratio for the inflight entries. If the entries
235 : // aren't being compressed, then compressedSize and uncompressedSize must be
236 : // equal.
237 : compressedSize uint64
238 : uncompressedSize uint64
239 : }
240 :
241 2 : func (s *sizeEstimate) init(emptySize uint64) {
242 2 : s.emptySize = emptySize
243 2 : }
244 :
245 2 : func (s *sizeEstimate) size() uint64 {
246 2 : ratio := float64(1)
247 2 : if s.uncompressedSize > 0 {
248 2 : ratio = float64(s.compressedSize) / float64(s.uncompressedSize)
249 2 : }
250 2 : estimatedInflightSize := uint64(float64(s.inflightSize) * ratio)
251 2 : total := s.totalSize + estimatedInflightSize
252 2 : if total > s.maxEstimatedSize {
253 2 : s.maxEstimatedSize = total
254 2 : } else {
255 2 : total = s.maxEstimatedSize
256 2 : }
257 :
258 2 : if total == 0 {
259 2 : return s.emptySize
260 2 : }
261 :
262 2 : return total
263 : }
264 :
265 2 : func (s *sizeEstimate) numTotalEntries() uint64 {
266 2 : return s.numWrittenEntries + s.numInflightEntries
267 2 : }
268 :
269 2 : func (s *sizeEstimate) addInflight(size int) {
270 2 : s.numInflightEntries++
271 2 : s.inflightSize += uint64(size)
272 2 : }
273 :
274 2 : func (s *sizeEstimate) writtenWithTotal(newTotalSize uint64, inflightSize int) {
275 2 : finalEntrySize := int(newTotalSize - s.totalSize)
276 2 : s.writtenWithDelta(finalEntrySize, inflightSize)
277 2 : }
278 :
279 2 : func (s *sizeEstimate) writtenWithDelta(finalEntrySize int, inflightSize int) {
280 2 : if inflightSize > 0 {
281 2 : // This entry was previously inflight, so we should decrement inflight
282 2 : // entries and update the "compression" stats for future estimation.
283 2 : s.numInflightEntries--
284 2 : s.inflightSize -= uint64(inflightSize)
285 2 : s.uncompressedSize += uint64(inflightSize)
286 2 : s.compressedSize += uint64(finalEntrySize)
287 2 : }
288 2 : s.numWrittenEntries++
289 2 : s.totalSize += uint64(finalEntrySize)
290 : }
291 :
292 2 : func (s *sizeEstimate) clear() {
293 2 : *s = sizeEstimate{emptySize: s.emptySize}
294 2 : }
295 :
296 : type indexBlockBuf struct {
297 : // block will only be accessed from the writeQueue.
298 : block rowblk.Writer
299 :
300 : size struct {
301 : useMutex bool
302 : mu sync.Mutex
303 : estimate sizeEstimate
304 : }
305 :
306 : // restartInterval matches indexBlockBuf.block.restartInterval. We store it twice, because the `block`
307 : // must only be accessed from the writeQueue goroutine.
308 : restartInterval int
309 : }
310 :
311 2 : func (i *indexBlockBuf) clear() {
312 2 : i.block.Reset()
313 2 : if i.size.useMutex {
314 2 : i.size.mu.Lock()
315 2 : defer i.size.mu.Unlock()
316 2 : }
317 2 : i.size.estimate.clear()
318 2 : i.restartInterval = 0
319 : }
320 :
321 : var indexBlockBufPool = sync.Pool{
322 2 : New: func() interface{} {
323 2 : return &indexBlockBuf{}
324 2 : },
325 : }
326 :
327 : const indexBlockRestartInterval = 1
328 :
329 2 : func newIndexBlockBuf(useMutex bool) *indexBlockBuf {
330 2 : i := indexBlockBufPool.Get().(*indexBlockBuf)
331 2 : i.size.useMutex = useMutex
332 2 : i.restartInterval = indexBlockRestartInterval
333 2 : i.block.RestartInterval = indexBlockRestartInterval
334 2 : i.size.estimate.init(rowblk.EmptySize)
335 2 : return i
336 2 : }
337 :
338 : func (i *indexBlockBuf) shouldFlush(
339 : sep InternalKey, valueLen int, flushGovernor *block.FlushGovernor,
340 2 : ) bool {
341 2 : if i.size.useMutex {
342 2 : i.size.mu.Lock()
343 2 : defer i.size.mu.Unlock()
344 2 : }
345 :
346 2 : nEntries := i.size.estimate.numTotalEntries()
347 2 : return shouldFlush(
348 2 : sep.Size(), valueLen, i.restartInterval, int(i.size.estimate.size()),
349 2 : int(nEntries), flushGovernor)
350 : }
351 :
352 2 : func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) {
353 2 : i.block.Add(key, value)
354 2 : size := i.block.EstimatedSize()
355 2 : if i.size.useMutex {
356 2 : i.size.mu.Lock()
357 2 : defer i.size.mu.Unlock()
358 2 : }
359 2 : i.size.estimate.writtenWithTotal(uint64(size), inflightSize)
360 : }
361 :
362 2 : func (i *indexBlockBuf) finish() []byte {
363 2 : b := i.block.Finish()
364 2 : return b
365 2 : }
366 :
367 2 : func (i *indexBlockBuf) addInflight(inflightSize int) {
368 2 : if i.size.useMutex {
369 2 : i.size.mu.Lock()
370 2 : defer i.size.mu.Unlock()
371 2 : }
372 2 : i.size.estimate.addInflight(inflightSize)
373 : }
374 :
375 2 : func (i *indexBlockBuf) estimatedSize() uint64 {
376 2 : if i.size.useMutex {
377 2 : i.size.mu.Lock()
378 2 : defer i.size.mu.Unlock()
379 2 : }
380 :
381 : // Make sure that the size estimation works as expected when parallelism
382 : // is disabled.
383 2 : if invariants.Enabled && !i.size.useMutex {
384 2 : if i.size.estimate.inflightSize != 0 {
385 0 : panic("unexpected inflight entry in index block size estimation")
386 : }
387 :
388 : // NB: The i.block should only be accessed from the writeQueue goroutine,
389 : // when parallelism is enabled. We break that invariant here, but that's
390 : // okay since parallelism is disabled.
391 2 : if i.size.estimate.size() != uint64(i.block.EstimatedSize()) {
392 0 : panic("index block size estimation sans parallelism is incorrect")
393 : }
394 : }
395 2 : return i.size.estimate.size()
396 : }
397 :
398 : // sizeEstimate is used for sstable size estimation. sizeEstimate can be
399 : // accessed by the Writer client and compressionQueue goroutines. Fields
400 : // should only be read/updated through the functions defined on the
401 : // *sizeEstimate type.
402 : type dataBlockEstimates struct {
403 : // If we don't do block compression in parallel, then we don't need to take
404 : // the performance hit of synchronizing using this mutex.
405 : useMutex bool
406 : mu sync.Mutex
407 :
408 : estimate sizeEstimate
409 : }
410 :
411 : // inflightSize is the uncompressed block size estimate which has been
412 : // previously provided to addInflightDataBlock(). If addInflightDataBlock()
413 : // has not been called, this must be set to 0. compressedSize is the
414 : // compressed size of the block.
415 2 : func (d *dataBlockEstimates) dataBlockCompressed(compressedSize int, inflightSize int) {
416 2 : if d.useMutex {
417 0 : d.mu.Lock()
418 0 : defer d.mu.Unlock()
419 0 : }
420 2 : d.estimate.writtenWithDelta(compressedSize+block.TrailerLen, inflightSize)
421 : }
422 :
423 : // size is an estimated size of datablock data which has been written to disk.
424 2 : func (d *dataBlockEstimates) size() uint64 {
425 2 : if d.useMutex {
426 0 : d.mu.Lock()
427 0 : defer d.mu.Unlock()
428 0 : }
429 : // If there is no parallel compression, there should not be any inflight bytes.
430 2 : if invariants.Enabled && !d.useMutex {
431 2 : if d.estimate.inflightSize != 0 {
432 0 : panic("unexpected inflight entry in data block size estimation")
433 : }
434 : }
435 2 : return d.estimate.size()
436 : }
437 :
438 : // Avoid linter unused error.
439 : var _ = (&dataBlockEstimates{}).addInflightDataBlock
440 :
441 : // NB: unused since no parallel compression.
442 0 : func (d *dataBlockEstimates) addInflightDataBlock(size int) {
443 0 : if d.useMutex {
444 0 : d.mu.Lock()
445 0 : defer d.mu.Unlock()
446 0 : }
447 :
448 0 : d.estimate.addInflight(size)
449 : }
450 :
451 : var writeTaskPool = sync.Pool{
452 2 : New: func() interface{} {
453 2 : t := &writeTask{}
454 2 : t.compressionDone = make(chan bool, 1)
455 2 : return t
456 2 : },
457 : }
458 :
459 : type blockBuf struct {
460 : // tmp is a scratch buffer, large enough to hold either footerLen bytes,
461 : // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most
462 : // likely large enough for a block handle with properties.
463 : tmp [blockHandleLikelyMaxLen]byte
464 : // compressedBuf is the destination buffer for compression. It is re-used over the
465 : // lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block.
466 : compressedBuf []byte
467 : checksummer block.Checksummer
468 : }
469 :
470 2 : func (b *blockBuf) clear() {
471 2 : // We can't assign b.compressedBuf[:0] to compressedBuf because snappy relies
472 2 : // on the length of the buffer, and not the capacity to determine if it needs
473 2 : // to make an allocation.
474 2 : *b = blockBuf{
475 2 : compressedBuf: b.compressedBuf, checksummer: b.checksummer,
476 2 : }
477 2 : }
478 :
479 : // A dataBlockBuf holds all the state required to compress and write a data block to disk.
480 : // A dataBlockBuf begins its lifecycle owned by the Writer client goroutine. The Writer
481 : // client goroutine adds keys to the sstable, writing directly into a dataBlockBuf's blockWriter
482 : // until the block is full. Once a dataBlockBuf's block is full, the dataBlockBuf may be passed
483 : // to other goroutines for compression and file I/O.
484 : type dataBlockBuf struct {
485 : blockBuf
486 : dataBlock rowblk.Writer
487 :
488 : // uncompressed is a reference to a byte slice which is owned by the dataBlockBuf. It is the
489 : // next byte slice to be compressed. The uncompressed byte slice will be backed by the
490 : // dataBlock.buf.
491 : uncompressed []byte
492 :
493 : // physical holds the (possibly) compressed block and its trailer. The
494 : // underlying block data's byte slice is owned by the dataBlockBuf. It may
495 : // be backed by the dataBlock.buf, or the dataBlockBuf.compressedBuf,
496 : // depending on whether we use the result of the compression.
497 : physical block.PhysicalBlock
498 :
499 : // We're making calls to BlockPropertyCollectors from the Writer client goroutine. We need to
500 : // pass the encoded block properties over to the write queue. To prevent copies, and allocations,
501 : // we give each dataBlockBuf, a blockPropertiesEncoder.
502 : blockPropsEncoder blockPropertiesEncoder
503 : // dataBlockProps is set when Writer.finishDataBlockProps is called. The dataBlockProps slice is
504 : // a shallow copy of the internal buffer of the dataBlockBuf.blockPropsEncoder.
505 : dataBlockProps []byte
506 :
507 : // sepScratch is reusable scratch space for computing separator keys.
508 : sepScratch []byte
509 :
510 : // numDeletions stores the count of point tombstones in this data block.
511 : // It's used to determine if this data block is considered tombstone-dense
512 : // for the purposes of compaction.
513 : numDeletions int
514 : // deletionSize stores the raw size of point tombstones in this data block.
515 : // It's used to determine if this data block is considered tombstone-dense
516 : // for the purposes of compaction.
517 : deletionSize int
518 : }
519 :
520 2 : func (d *dataBlockBuf) clear() {
521 2 : d.blockBuf.clear()
522 2 : d.dataBlock.Reset()
523 2 :
524 2 : d.uncompressed = nil
525 2 : d.physical = block.PhysicalBlock{}
526 2 : d.dataBlockProps = nil
527 2 : d.sepScratch = d.sepScratch[:0]
528 2 : }
529 :
530 : var dataBlockBufPool = sync.Pool{
531 2 : New: func() interface{} {
532 2 : return &dataBlockBuf{}
533 2 : },
534 : }
535 :
536 2 : func newDataBlockBuf(restartInterval int, checksumType block.ChecksumType) *dataBlockBuf {
537 2 : d := dataBlockBufPool.Get().(*dataBlockBuf)
538 2 : d.dataBlock.RestartInterval = restartInterval
539 2 : d.checksummer.Type = checksumType
540 2 : return d
541 2 : }
542 :
543 2 : func (d *dataBlockBuf) finish() {
544 2 : d.uncompressed = d.dataBlock.Finish()
545 2 : }
546 :
547 2 : func (d *dataBlockBuf) compressAndChecksum(c block.Compression) {
548 2 : d.physical = block.CompressAndChecksum(&d.compressedBuf, d.uncompressed, c, &d.checksummer)
549 2 : }
550 :
551 : func (d *dataBlockBuf) shouldFlush(
552 : key InternalKey, valueLen int, flushGovernor *block.FlushGovernor,
553 2 : ) bool {
554 2 : return shouldFlush(
555 2 : key.Size(), valueLen, d.dataBlock.RestartInterval, d.dataBlock.EstimatedSize(),
556 2 : d.dataBlock.EntryCount(), flushGovernor)
557 2 : }
558 :
559 : type bufferedIndexBlock struct {
560 : nEntries int
561 : // sep is the last key added to this block, for computing a separator later.
562 : sep InternalKey
563 : properties []byte
564 : // block is the encoded block produced by blockWriter.finish.
565 : block []byte
566 : }
567 :
568 : // AddWithForceObsolete must be used when writing a strict-obsolete sstable.
569 : //
570 : // forceObsolete indicates whether the caller has determined that this key is
571 : // obsolete even though it may be the latest point key for this userkey. This
572 : // should be set to true for keys obsoleted by RANGEDELs, and is required for
573 : // strict-obsolete sstables.
574 : //
575 : // Note that there are two properties, S1 and S2 (see comment in format.go)
576 : // that strict-obsolete ssts must satisfy. S2, due to RANGEDELs, is solely the
577 : // responsibility of the caller. S1 is solely the responsibility of the
578 : // callee.
579 : func (w *RawRowWriter) AddWithForceObsolete(
580 : key InternalKey, value []byte, forceObsolete bool,
581 2 : ) error {
582 2 : if w.err != nil {
583 0 : return w.err
584 0 : }
585 :
586 2 : switch key.Kind() {
587 1 : case InternalKeyKindRangeDelete:
588 1 : return w.addTombstone(key, value)
589 : case base.InternalKeyKindRangeKeyDelete,
590 : base.InternalKeyKindRangeKeySet,
591 0 : base.InternalKeyKindRangeKeyUnset:
592 0 : w.err = errors.Errorf(
593 0 : "pebble: range keys must be added via one of the RangeKey* functions")
594 0 : return w.err
595 : }
596 2 : return w.addPoint(key, value, forceObsolete)
597 : }
598 :
599 2 : func (w *RawRowWriter) makeAddPointDecisionV2(key InternalKey) error {
600 2 : prevTrailer := w.lastPointKeyInfo.trailer
601 2 : w.lastPointKeyInfo.trailer = key.Trailer
602 2 : if w.dataBlockBuf.dataBlock.EntryCount() == 0 {
603 2 : return nil
604 2 : }
605 2 : if !w.disableKeyOrderChecks {
606 2 : prevPointUserKey := w.dataBlockBuf.dataBlock.CurUserKey()
607 2 : cmpUser := w.compare(prevPointUserKey, key.UserKey)
608 2 : if cmpUser > 0 || (cmpUser == 0 && prevTrailer <= key.Trailer) {
609 1 : return errors.Errorf(
610 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
611 1 : InternalKey{UserKey: prevPointUserKey, Trailer: prevTrailer}.Pretty(w.formatKey),
612 1 : key.Pretty(w.formatKey))
613 1 : }
614 : }
615 2 : return nil
616 : }
617 :
618 : // REQUIRES: at least one point has been written to the Writer.
619 2 : func (w *RawRowWriter) getLastPointUserKey() []byte {
620 2 : if w.dataBlockBuf.dataBlock.EntryCount() == 0 {
621 0 : panic(errors.AssertionFailedf("no point keys added to writer"))
622 : }
623 2 : return w.dataBlockBuf.dataBlock.CurUserKey()
624 : }
625 :
626 : // REQUIRES: w.tableFormat >= TableFormatPebblev3
627 : func (w *RawRowWriter) makeAddPointDecisionV3(
628 : key InternalKey, valueLen int,
629 2 : ) (setHasSamePrefix bool, writeToValueBlock bool, isObsolete bool, err error) {
630 2 : prevPointKeyInfo := w.lastPointKeyInfo
631 2 : w.lastPointKeyInfo.userKeyLen = len(key.UserKey)
632 2 : w.lastPointKeyInfo.prefixLen = w.split(key.UserKey)
633 2 : w.lastPointKeyInfo.trailer = key.Trailer
634 2 : w.lastPointKeyInfo.isObsolete = false
635 2 : if !w.meta.HasPointKeys {
636 2 : return false, false, false, nil
637 2 : }
638 2 : keyKind := key.Trailer.Kind()
639 2 : prevPointUserKey := w.getLastPointUserKey()
640 2 : prevPointKey := InternalKey{UserKey: prevPointUserKey, Trailer: prevPointKeyInfo.trailer}
641 2 : prevKeyKind := prevPointKeyInfo.trailer.Kind()
642 2 : considerWriteToValueBlock := prevKeyKind == InternalKeyKindSet &&
643 2 : keyKind == InternalKeyKindSet
644 2 : if considerWriteToValueBlock && !w.requiredInPlaceValueBound.IsEmpty() {
645 1 : keyPrefix := key.UserKey[:w.lastPointKeyInfo.prefixLen]
646 1 : cmpUpper := w.compare(
647 1 : w.requiredInPlaceValueBound.Upper, keyPrefix)
648 1 : if cmpUpper <= 0 {
649 1 : // Common case for CockroachDB. Make it empty since all future keys in
650 1 : // this sstable will also have cmpUpper <= 0.
651 1 : w.requiredInPlaceValueBound = UserKeyPrefixBound{}
652 1 : } else if w.compare(keyPrefix, w.requiredInPlaceValueBound.Lower) >= 0 {
653 1 : considerWriteToValueBlock = false
654 1 : }
655 : }
656 : // cmpPrefix is initialized iff considerWriteToValueBlock.
657 2 : var cmpPrefix int
658 2 : var cmpUser int
659 2 : if considerWriteToValueBlock {
660 2 : // Compare the prefixes.
661 2 : cmpPrefix = w.compare(prevPointUserKey[:prevPointKeyInfo.prefixLen],
662 2 : key.UserKey[:w.lastPointKeyInfo.prefixLen])
663 2 : cmpUser = cmpPrefix
664 2 : if cmpPrefix == 0 {
665 2 : // Need to compare suffixes to compute cmpUser.
666 2 : cmpUser = w.pointSuffixCmp(prevPointUserKey[prevPointKeyInfo.prefixLen:],
667 2 : key.UserKey[w.lastPointKeyInfo.prefixLen:])
668 2 : }
669 2 : } else {
670 2 : cmpUser = w.compare(prevPointUserKey, key.UserKey)
671 2 : }
672 : // Ensure that no one adds a point key kind without considering the obsolete
673 : // handling for that kind.
674 2 : switch keyKind {
675 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge,
676 2 : InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
677 0 : default:
678 0 : panic(errors.AssertionFailedf("unexpected key kind %s", keyKind.String()))
679 : }
680 : // If same user key, then the current key is obsolete if any of the
681 : // following is true:
682 : // C1 The prev key was obsolete.
683 : // C2 The prev key was not a MERGE. When the previous key is a MERGE we must
684 : // preserve SET* and MERGE since their values will be merged into the
685 : // previous key. We also must preserve DEL* since there may be an older
686 : // SET*/MERGE in a lower level that must not be merged with the MERGE --
687 : // if we omit the DEL* that lower SET*/MERGE will become visible.
688 : //
689 : // Regardless of whether it is the same user key or not
690 : // C3 The current key is some kind of point delete, and we are writing to
691 : // the lowest level, then it is also obsolete. The correctness of this
692 : // relies on the same user key not spanning multiple sstables in a level.
693 : //
694 : // C1 ensures that for a user key there is at most one transition from
695 : // !obsolete to obsolete. Consider a user key k, for which the first n keys
696 : // are not obsolete. We consider the various value of n:
697 : //
698 : // n = 0: This happens due to forceObsolete being set by the caller, or due
699 : // to C3. forceObsolete must only be set due a RANGEDEL, and that RANGEDEL
700 : // must also delete all the lower seqnums for the same user key. C3 triggers
701 : // due to a point delete and that deletes all the lower seqnums for the same
702 : // user key.
703 : //
704 : // n = 1: This is the common case. It happens when the first key is not a
705 : // MERGE, or the current key is some kind of point delete.
706 : //
707 : // n > 1: This is due to a sequence of MERGE keys, potentially followed by a
708 : // single non-MERGE key.
709 2 : isObsoleteC1AndC2 := cmpUser == 0 &&
710 2 : (prevPointKeyInfo.isObsolete || prevKeyKind != InternalKeyKindMerge)
711 2 : isObsoleteC3 := w.writingToLowestLevel &&
712 2 : (keyKind == InternalKeyKindDelete || keyKind == InternalKeyKindSingleDelete ||
713 2 : keyKind == InternalKeyKindDeleteSized)
714 2 : isObsolete = isObsoleteC1AndC2 || isObsoleteC3
715 2 : // TODO(sumeer): storing isObsolete SET and SETWITHDEL in value blocks is
716 2 : // possible, but requires some care in documenting and checking invariants.
717 2 : // There is code that assumes nothing in value blocks because of single MVCC
718 2 : // version (those should be ok). We have to ensure setHasSamePrefix is
719 2 : // correctly initialized here etc.
720 2 :
721 2 : if !w.disableKeyOrderChecks &&
722 2 : (cmpUser > 0 || (cmpUser == 0 && prevPointKeyInfo.trailer <= key.Trailer)) {
723 1 : return false, false, false, errors.Errorf(
724 1 : "pebble: keys must be added in strictly increasing order: %s, %s",
725 1 : prevPointKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
726 1 : }
727 2 : if !considerWriteToValueBlock {
728 2 : return false, false, isObsolete, nil
729 2 : }
730 : // NB: it is possible that cmpUser == 0, i.e., these two SETs have identical
731 : // user keys (because of an open snapshot). This should be the rare case.
732 2 : setHasSamePrefix = cmpPrefix == 0
733 2 : // Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
734 2 : // valueHandle, this should be > 3. But tiny values are common in test and
735 2 : // unlikely in production, so we use 0 here for better test coverage.
736 2 : const tinyValueThreshold = 0
737 2 : // NB: setting WriterOptions.DisableValueBlocks does not disable the
738 2 : // setHasSamePrefix optimization.
739 2 : considerWriteToValueBlock = setHasSamePrefix && valueLen > tinyValueThreshold && w.valueBlockWriter != nil
740 2 : return setHasSamePrefix, considerWriteToValueBlock, isObsolete, nil
741 : }
742 :
743 2 : func (w *RawRowWriter) addPoint(key InternalKey, value []byte, forceObsolete bool) error {
744 2 : if w.isStrictObsolete && key.Kind() == InternalKeyKindMerge {
745 1 : return errors.Errorf("MERGE not supported in a strict-obsolete sstable")
746 1 : }
747 2 : var err error
748 2 : var setHasSameKeyPrefix, writeToValueBlock, addPrefixToValueStoredWithKey bool
749 2 : var isObsolete bool
750 2 : maxSharedKeyLen := len(key.UserKey)
751 2 : if w.tableFormat >= TableFormatPebblev3 {
752 2 : // maxSharedKeyLen is limited to the prefix of the preceding key. If the
753 2 : // preceding key was in a different block, then the blockWriter will
754 2 : // ignore this maxSharedKeyLen.
755 2 : maxSharedKeyLen = w.lastPointKeyInfo.prefixLen
756 2 : setHasSameKeyPrefix, writeToValueBlock, isObsolete, err =
757 2 : w.makeAddPointDecisionV3(key, len(value))
758 2 : addPrefixToValueStoredWithKey = key.Kind() == InternalKeyKindSet
759 2 : } else {
760 2 : err = w.makeAddPointDecisionV2(key)
761 2 : }
762 2 : if err != nil {
763 1 : return err
764 1 : }
765 2 : isObsolete = w.tableFormat >= TableFormatPebblev4 && (isObsolete || forceObsolete)
766 2 : w.lastPointKeyInfo.isObsolete = isObsolete
767 2 : var valueStoredWithKey []byte
768 2 : var prefix block.ValuePrefix
769 2 : var valueStoredWithKeyLen int
770 2 : if writeToValueBlock {
771 2 : vh, err := w.valueBlockWriter.AddValue(value)
772 2 : if err != nil {
773 0 : return err
774 0 : }
775 2 : n := valblk.EncodeHandle(w.blockBuf.tmp[:], vh)
776 2 : valueStoredWithKey = w.blockBuf.tmp[:n]
777 2 : valueStoredWithKeyLen = len(valueStoredWithKey) + 1
778 2 : var attribute base.ShortAttribute
779 2 : if w.shortAttributeExtractor != nil {
780 1 : // TODO(sumeer): for compactions, it is possible that the input sstable
781 1 : // already has this value in the value section and so we have already
782 1 : // extracted the ShortAttribute. Avoid extracting it again. This will
783 1 : // require changing the Writer.Add interface.
784 1 : if attribute, err = w.shortAttributeExtractor(
785 1 : key.UserKey, w.lastPointKeyInfo.prefixLen, value); err != nil {
786 0 : return err
787 0 : }
788 : }
789 2 : prefix = block.ValueHandlePrefix(setHasSameKeyPrefix, attribute)
790 2 : } else {
791 2 : valueStoredWithKey = value
792 2 : valueStoredWithKeyLen = len(value)
793 2 : if addPrefixToValueStoredWithKey {
794 2 : valueStoredWithKeyLen++
795 2 : }
796 2 : prefix = block.InPlaceValuePrefix(setHasSameKeyPrefix)
797 : }
798 :
799 2 : if err := w.maybeFlush(key, valueStoredWithKeyLen); err != nil {
800 1 : return err
801 1 : }
802 :
803 2 : for i := range w.blockPropCollectors {
804 2 : v := value
805 2 : if addPrefixToValueStoredWithKey {
806 2 : // Values for SET are not required to be in-place, and in the future may
807 2 : // not even be read by the compaction, so pass nil values. Block
808 2 : // property collectors in such Pebble DB's must not look at the value.
809 2 : v = nil
810 2 : }
811 2 : if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
812 1 : w.err = err
813 1 : return err
814 1 : }
815 : }
816 2 : if w.tableFormat >= TableFormatPebblev4 {
817 2 : w.obsoleteCollector.AddPoint(isObsolete)
818 2 : }
819 :
820 2 : w.maybeAddToFilter(key.UserKey)
821 2 : w.dataBlockBuf.dataBlock.AddWithOptionalValuePrefix(
822 2 : key, isObsolete, valueStoredWithKey, maxSharedKeyLen, addPrefixToValueStoredWithKey, prefix,
823 2 : setHasSameKeyPrefix)
824 2 :
825 2 : w.meta.updateSeqNum(key.SeqNum())
826 2 :
827 2 : if !w.meta.HasPointKeys {
828 2 : k := w.dataBlockBuf.dataBlock.CurKey()
829 2 : // NB: We need to ensure that SmallestPoint.UserKey is set, so we create
830 2 : // an InternalKey which is semantically identical to the key, but won't
831 2 : // have a nil UserKey. We do this, because key.UserKey could be nil, and
832 2 : // we don't want SmallestPoint.UserKey to be nil.
833 2 : //
834 2 : // todo(bananabrick): Determine if it's okay to have a nil SmallestPoint
835 2 : // .UserKey now that we don't rely on a nil UserKey to determine if the
836 2 : // key has been set or not.
837 2 : w.meta.SetSmallestPointKey(k.Clone())
838 2 : }
839 :
840 2 : w.props.NumEntries++
841 2 : switch key.Kind() {
842 2 : case InternalKeyKindDelete, InternalKeyKindSingleDelete:
843 2 : w.props.NumDeletions++
844 2 : w.dataBlockBuf.numDeletions++
845 2 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
846 2 : w.dataBlockBuf.deletionSize += len(key.UserKey)
847 2 : case InternalKeyKindDeleteSized:
848 2 : var size uint64
849 2 : if len(value) > 0 {
850 2 : var n int
851 2 : size, n = binary.Uvarint(value)
852 2 : if n <= 0 {
853 0 : w.err = errors.Newf("%s key's value (%x) does not parse as uvarint",
854 0 : errors.Safe(key.Kind().String()), value)
855 0 : return w.err
856 0 : }
857 : }
858 2 : w.props.NumDeletions++
859 2 : w.props.NumSizedDeletions++
860 2 : w.dataBlockBuf.numDeletions++
861 2 : w.props.RawPointTombstoneKeySize += uint64(len(key.UserKey))
862 2 : w.dataBlockBuf.deletionSize += len(key.UserKey)
863 2 : w.props.RawPointTombstoneValueSize += size
864 2 : case InternalKeyKindMerge:
865 2 : w.props.NumMergeOperands++
866 : }
867 2 : w.props.RawKeySize += uint64(key.Size())
868 2 : w.props.RawValueSize += uint64(len(value))
869 2 : return nil
870 : }
871 :
872 1 : func (w *RawRowWriter) prettyTombstone(k InternalKey, value []byte) fmt.Formatter {
873 1 : return keyspan.Span{
874 1 : Start: k.UserKey,
875 1 : End: value,
876 1 : Keys: []keyspan.Key{{Trailer: k.Trailer}},
877 1 : }.Pretty(w.formatKey)
878 1 : }
879 :
880 2 : func (w *RawRowWriter) addTombstone(key InternalKey, value []byte) error {
881 2 : if !w.disableKeyOrderChecks && w.rangeDelBlock.EntryCount() > 0 {
882 2 : // Check that tombstones are being added in fragmented order. If the two
883 2 : // tombstones overlap, their start and end keys must be identical.
884 2 : prevKey := w.rangeDelBlock.CurKey()
885 2 : switch c := w.compare(prevKey.UserKey, key.UserKey); {
886 1 : case c > 0:
887 1 : w.err = errors.Errorf("pebble: keys must be added in order: %s, %s",
888 1 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
889 1 : return w.err
890 2 : case c == 0:
891 2 : prevValue := w.rangeDelBlock.CurValue()
892 2 : if w.compare(prevValue, value) != 0 {
893 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
894 1 : w.prettyTombstone(prevKey, prevValue),
895 1 : w.prettyTombstone(key, value))
896 1 : return w.err
897 1 : }
898 2 : if prevKey.SeqNum() <= key.SeqNum() {
899 1 : w.err = errors.Errorf("pebble: keys must be added in strictly increasing order: %s, %s",
900 1 : prevKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
901 1 : return w.err
902 1 : }
903 2 : default:
904 2 : prevValue := w.rangeDelBlock.CurValue()
905 2 : if w.compare(prevValue, key.UserKey) > 0 {
906 1 : w.err = errors.Errorf("pebble: overlapping tombstones must be fragmented: %s vs %s",
907 1 : w.prettyTombstone(prevKey, prevValue),
908 1 : w.prettyTombstone(key, value))
909 1 : return w.err
910 1 : }
911 : }
912 : }
913 :
914 2 : if key.Trailer == base.InternalKeyRangeDeleteSentinel {
915 0 : w.err = errors.Errorf("pebble: cannot add range delete sentinel: %s", key.Pretty(w.formatKey))
916 0 : return w.err
917 0 : }
918 :
919 2 : w.meta.updateSeqNum(key.SeqNum())
920 2 :
921 2 : // Range tombstones are fragmented in the v2 range deletion block format,
922 2 : // so the start key of the first range tombstone added will be the smallest
923 2 : // range tombstone key. The largest range tombstone key will be determined
924 2 : // in Writer.Close() as the end key of the last range tombstone added.
925 2 : if w.props.NumRangeDeletions == 0 {
926 2 : w.meta.SetSmallestRangeDelKey(key.Clone())
927 2 : }
928 :
929 2 : w.props.NumEntries++
930 2 : w.props.NumDeletions++
931 2 : w.props.NumRangeDeletions++
932 2 : w.props.RawKeySize += uint64(key.Size())
933 2 : w.props.RawValueSize += uint64(len(value))
934 2 : w.rangeDelBlock.Add(key, value)
935 2 : return nil
936 : }
937 :
938 : // addRangeKey adds a range key set, unset, or delete key/value pair to the
939 : // table being written.
940 : //
941 : // Range keys must be supplied in strictly ascending order of start key (i.e.
942 : // user key ascending, sequence number descending, and key type descending).
943 : // Ranges added must also be supplied in fragmented span order - i.e. other than
944 : // spans that are perfectly aligned (same start and end keys), spans may not
945 : // overlap. Range keys may be added out of order relative to point keys and
946 : // range deletions.
947 2 : func (w *RawRowWriter) addRangeKey(key InternalKey, value []byte) error {
948 2 : if !w.disableKeyOrderChecks && w.rangeKeyBlock.EntryCount() > 0 {
949 2 : prevStartKey := w.rangeKeyBlock.CurKey()
950 2 : prevEndKey, _, err := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.CurValue())
951 2 : if err != nil {
952 0 : // We panic here as we should have previously decoded and validated this
953 0 : // key and value when it was first added to the range key block.
954 0 : panic(err)
955 : }
956 :
957 2 : curStartKey := key
958 2 : curEndKey, _, err := rangekey.DecodeEndKey(curStartKey.Kind(), value)
959 2 : if err != nil {
960 0 : w.err = err
961 0 : return w.err
962 0 : }
963 :
964 : // Start keys must be strictly increasing.
965 2 : if base.InternalCompare(w.compare, prevStartKey, curStartKey) >= 0 {
966 1 : w.err = errors.Errorf(
967 1 : "pebble: range keys starts must be added in increasing order: %s, %s",
968 1 : prevStartKey.Pretty(w.formatKey), key.Pretty(w.formatKey))
969 1 : return w.err
970 1 : }
971 :
972 : // Start keys are increasing. If the start user keys are equal, the
973 : // end keys must be equal (i.e. aligned spans).
974 2 : if w.compare(prevStartKey.UserKey, curStartKey.UserKey) == 0 {
975 2 : if w.compare(prevEndKey, curEndKey) != 0 {
976 0 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
977 0 : prevStartKey.Pretty(w.formatKey),
978 0 : curStartKey.Pretty(w.formatKey))
979 0 : return w.err
980 0 : }
981 2 : } else if w.compare(prevEndKey, curStartKey.UserKey) > 0 {
982 0 : // If the start user keys are NOT equal, the spans must be disjoint (i.e.
983 0 : // no overlap).
984 0 : // NOTE: the inequality excludes zero, as we allow the end key of the
985 0 : // lower span be the same as the start key of the upper span, because
986 0 : // the range end key is considered an exclusive bound.
987 0 : w.err = errors.Errorf("pebble: overlapping range keys must be fragmented: %s, %s",
988 0 : prevStartKey.Pretty(w.formatKey),
989 0 : curStartKey.Pretty(w.formatKey))
990 0 : return w.err
991 0 : }
992 : }
993 :
994 : // TODO(travers): Add an invariant-gated check to ensure that suffix-values
995 : // are sorted within coalesced spans.
996 :
997 : // Range-keys and point-keys are intended to live in "parallel" keyspaces.
998 : // However, we track a single seqnum in the table metadata that spans both of
999 : // these keyspaces.
1000 : // TODO(travers): Consider tracking range key seqnums separately.
1001 2 : w.meta.updateSeqNum(key.SeqNum())
1002 2 :
1003 2 : // Range tombstones are fragmented, so the start key of the first range key
1004 2 : // added will be the smallest. The largest range key is determined in
1005 2 : // Writer.Close() as the end key of the last range key added to the block.
1006 2 : if w.props.NumRangeKeys() == 0 {
1007 2 : w.meta.SetSmallestRangeKey(key.Clone())
1008 2 : }
1009 :
1010 : // Update table properties.
1011 2 : w.props.RawRangeKeyKeySize += uint64(key.Size())
1012 2 : w.props.RawRangeKeyValueSize += uint64(len(value))
1013 2 : switch key.Kind() {
1014 2 : case base.InternalKeyKindRangeKeyDelete:
1015 2 : w.props.NumRangeKeyDels++
1016 2 : case base.InternalKeyKindRangeKeySet:
1017 2 : w.props.NumRangeKeySets++
1018 2 : case base.InternalKeyKindRangeKeyUnset:
1019 2 : w.props.NumRangeKeyUnsets++
1020 0 : default:
1021 0 : panic(errors.Errorf("pebble: invalid range key type: %s", key.Kind()))
1022 : }
1023 :
1024 : // Add the key to the block.
1025 2 : w.rangeKeyBlock.Add(key, value)
1026 2 : return nil
1027 : }
1028 :
1029 2 : func (w *RawRowWriter) maybeAddToFilter(key []byte) {
1030 2 : if w.filter != nil {
1031 2 : prefix := key[:w.split(key)]
1032 2 : w.filter.addKey(prefix)
1033 2 : }
1034 : }
1035 :
1036 : // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense
1037 : // blocks if the number of deletions in the data block exceeds a threshold or
1038 : // the deletion size exceeds a threshold. It should be called after the
1039 : // data block has been finished.
1040 : // Invariant: w.dataBlockBuf.uncompressed must already be populated.
1041 2 : func (w *RawRowWriter) maybeIncrementTombstoneDenseBlocks() {
1042 2 : minSize := w.deletionSizeRatioThreshold * float32(len(w.dataBlockBuf.uncompressed))
1043 2 : if w.dataBlockBuf.numDeletions > w.numDeletionsThreshold || float32(w.dataBlockBuf.deletionSize) > minSize {
1044 2 : w.props.NumTombstoneDenseBlocks++
1045 2 : }
1046 2 : w.dataBlockBuf.numDeletions = 0
1047 2 : w.dataBlockBuf.deletionSize = 0
1048 : }
1049 :
1050 2 : func (w *RawRowWriter) flush(key InternalKey) error {
1051 2 : // We're finishing a data block.
1052 2 : err := w.finishDataBlockProps(w.dataBlockBuf)
1053 2 : if err != nil {
1054 1 : return err
1055 1 : }
1056 2 : w.dataBlockBuf.finish()
1057 2 : w.maybeIncrementTombstoneDenseBlocks()
1058 2 : w.dataBlockBuf.compressAndChecksum(w.compression)
1059 2 : // Since dataBlockEstimates.addInflightDataBlock was never called, the
1060 2 : // inflightSize is set to 0.
1061 2 : w.coordination.sizeEstimate.dataBlockCompressed(w.dataBlockBuf.physical.LengthWithoutTrailer(), 0)
1062 2 :
1063 2 : // Determine if the index block should be flushed. Since we're accessing the
1064 2 : // dataBlockBuf.dataBlock.curKey here, we have to make sure that once we start
1065 2 : // to pool the dataBlockBufs, the curKey isn't used by the Writer once the
1066 2 : // dataBlockBuf is added back to a sync.Pool. In this particular case, the
1067 2 : // byte slice which supports "sep" will eventually be copied when "sep" is
1068 2 : // added to the index block.
1069 2 : prevKey := w.dataBlockBuf.dataBlock.CurKey()
1070 2 : sep := w.indexEntrySep(prevKey, key, w.dataBlockBuf)
1071 2 : // We determine that we should flush an index block from the Writer client
1072 2 : // goroutine, but we actually finish the index block from the writeQueue.
1073 2 : // When we determine that an index block should be flushed, we need to call
1074 2 : // BlockPropertyCollector.FinishIndexBlock. But block property collector
1075 2 : // calls must happen sequentially from the Writer client. Therefore, we need
1076 2 : // to determine that we are going to flush the index block from the Writer
1077 2 : // client.
1078 2 : shouldFlushIndexBlock := supportsTwoLevelIndex(w.tableFormat) &&
1079 2 : w.indexBlock.shouldFlush(sep, encodedBHPEstimatedSize, &w.indexFlush)
1080 2 :
1081 2 : var indexProps []byte
1082 2 : var flushableIndexBlock *indexBlockBuf
1083 2 : if shouldFlushIndexBlock {
1084 2 : flushableIndexBlock = w.indexBlock
1085 2 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1086 2 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1087 2 : // flush the index block.
1088 2 : indexProps, err = w.finishIndexBlockProps()
1089 2 : if err != nil {
1090 1 : return err
1091 1 : }
1092 : }
1093 :
1094 : // We've called BlockPropertyCollector.FinishDataBlock, and, if necessary,
1095 : // BlockPropertyCollector.FinishIndexBlock. Since we've decided to finish
1096 : // the data block, we can call
1097 : // BlockPropertyCollector.AddPrevDataBlockToIndexBlock.
1098 2 : w.addPrevDataBlockToIndexBlockProps()
1099 2 :
1100 2 : // Schedule a write.
1101 2 : writeTask := writeTaskPool.Get().(*writeTask)
1102 2 : // We're setting compressionDone to indicate that compression of this block
1103 2 : // has already been completed.
1104 2 : writeTask.compressionDone <- true
1105 2 : writeTask.buf = w.dataBlockBuf
1106 2 : writeTask.indexEntrySep = sep
1107 2 : writeTask.currIndexBlock = w.indexBlock
1108 2 : writeTask.indexInflightSize = sep.Size() + encodedBHPEstimatedSize
1109 2 : writeTask.finishedIndexProps = indexProps
1110 2 : writeTask.flushableIndexBlock = flushableIndexBlock
1111 2 :
1112 2 : // The writeTask corresponds to an unwritten index entry.
1113 2 : w.indexBlock.addInflight(writeTask.indexInflightSize)
1114 2 :
1115 2 : w.dataBlockBuf = nil
1116 2 : if w.coordination.parallelismEnabled {
1117 2 : w.coordination.writeQueue.add(writeTask)
1118 2 : } else {
1119 2 : err = w.coordination.writeQueue.addSync(writeTask)
1120 2 : }
1121 2 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
1122 2 :
1123 2 : return err
1124 : }
1125 :
1126 2 : func (w *RawRowWriter) maybeFlush(key InternalKey, valueLen int) error {
1127 2 : if !w.dataBlockBuf.shouldFlush(key, valueLen, &w.dataFlush) {
1128 2 : return nil
1129 2 : }
1130 :
1131 2 : err := w.flush(key)
1132 2 :
1133 2 : if err != nil {
1134 1 : w.err = err
1135 1 : return err
1136 1 : }
1137 :
1138 2 : return nil
1139 : }
1140 :
1141 : // dataBlockBuf.dataBlockProps set by this method must be encoded before any future use of the
1142 : // dataBlockBuf.blockPropsEncoder, since the properties slice will get reused by the
1143 : // blockPropsEncoder.
1144 2 : func (w *RawRowWriter) finishDataBlockProps(buf *dataBlockBuf) error {
1145 2 : if len(w.blockPropCollectors) == 0 {
1146 2 : return nil
1147 2 : }
1148 2 : var err error
1149 2 : buf.blockPropsEncoder.resetProps()
1150 2 : for i := range w.blockPropCollectors {
1151 2 : scratch := buf.blockPropsEncoder.getScratchForProp()
1152 2 : if scratch, err = w.blockPropCollectors[i].FinishDataBlock(scratch); err != nil {
1153 1 : return err
1154 1 : }
1155 2 : buf.blockPropsEncoder.addProp(shortID(i), scratch)
1156 : }
1157 :
1158 2 : buf.dataBlockProps = buf.blockPropsEncoder.unsafeProps()
1159 2 : return nil
1160 : }
1161 :
1162 : // The BlockHandleWithProperties returned by this method must be encoded before any future use of
1163 : // the Writer.blockPropsEncoder, since the properties slice will get reused by the blockPropsEncoder.
1164 : // maybeAddBlockPropertiesToBlockHandle should only be called if block is being written synchronously
1165 : // with the Writer client.
1166 : func (w *RawRowWriter) maybeAddBlockPropertiesToBlockHandle(
1167 : bh block.Handle,
1168 2 : ) (block.HandleWithProperties, error) {
1169 2 : err := w.finishDataBlockProps(w.dataBlockBuf)
1170 2 : if err != nil {
1171 0 : return block.HandleWithProperties{}, err
1172 0 : }
1173 2 : return block.HandleWithProperties{Handle: bh, Props: w.dataBlockBuf.dataBlockProps}, nil
1174 : }
1175 :
1176 : func (w *RawRowWriter) indexEntrySep(
1177 : prevKey, key InternalKey, dataBlockBuf *dataBlockBuf,
1178 2 : ) InternalKey {
1179 2 : // Make a rough guess that we want key-sized scratch to compute the separator.
1180 2 : if cap(dataBlockBuf.sepScratch) < key.Size() {
1181 2 : dataBlockBuf.sepScratch = make([]byte, 0, key.Size()*2)
1182 2 : }
1183 :
1184 2 : var sep InternalKey
1185 2 : if key.UserKey == nil && key.Trailer == 0 {
1186 2 : sep = prevKey.Successor(w.compare, w.successor, dataBlockBuf.sepScratch[:0])
1187 2 : } else {
1188 2 : sep = prevKey.Separator(w.compare, w.separator, dataBlockBuf.sepScratch[:0], key)
1189 2 : }
1190 2 : return sep
1191 : }
1192 :
1193 : // addIndexEntry adds an index entry for the specified key and block handle.
1194 : // addIndexEntry can be called from both the Writer client goroutine, and the
1195 : // writeQueue goroutine. If the flushIndexBuf != nil, then the indexProps, as
1196 : // they're used when the index block is finished.
1197 : //
1198 : // Invariant:
1199 : // 1. addIndexEntry must not store references to the sep InternalKey, the tmp
1200 : // byte slice, bhp.Props. That is, these must be either deep copied or
1201 : // encoded.
1202 : // 2. addIndexEntry must not hold references to the flushIndexBuf, and the writeTo
1203 : // indexBlockBufs.
1204 : func (w *RawRowWriter) addIndexEntry(
1205 : sep InternalKey,
1206 : bhp block.HandleWithProperties,
1207 : tmp []byte,
1208 : flushIndexBuf *indexBlockBuf,
1209 : writeTo *indexBlockBuf,
1210 : inflightSize int,
1211 : indexProps []byte,
1212 2 : ) error {
1213 2 : if bhp.Length == 0 {
1214 0 : // A valid blockHandle must be non-zero.
1215 0 : // In particular, it must have a non-zero length.
1216 0 : return nil
1217 0 : }
1218 :
1219 2 : encoded := bhp.EncodeVarints(tmp)
1220 2 : if flushIndexBuf != nil {
1221 2 : if cap(w.indexPartitions) == 0 {
1222 2 : w.indexPartitions = make([]bufferedIndexBlock, 0, 32)
1223 2 : }
1224 : // Enable two level indexes if there is more than one index block.
1225 2 : w.twoLevelIndex = true
1226 2 : if err := w.finishIndexBlock(flushIndexBuf, indexProps); err != nil {
1227 0 : return err
1228 0 : }
1229 : }
1230 :
1231 2 : writeTo.add(sep, encoded, inflightSize)
1232 2 : return nil
1233 : }
1234 :
1235 2 : func (w *RawRowWriter) addPrevDataBlockToIndexBlockProps() {
1236 2 : for i := range w.blockPropCollectors {
1237 2 : w.blockPropCollectors[i].AddPrevDataBlockToIndexBlock()
1238 2 : }
1239 : }
1240 :
1241 : // addIndexEntrySync adds an index entry for the specified key and block handle.
1242 : // Writer.addIndexEntry is only called synchronously once Writer.Close is called.
1243 : // addIndexEntrySync should only be called if we're sure that index entries
1244 : // aren't being written asynchronously.
1245 : //
1246 : // Invariant:
1247 : // 1. addIndexEntrySync must not store references to the prevKey, key InternalKey's,
1248 : // the tmp byte slice. That is, these must be either deep copied or encoded.
1249 : //
1250 : // TODO: Improve coverage of this method. e.g. tests passed without the line
1251 : // `w.twoLevelIndex = true` previously.
1252 : func (w *RawRowWriter) addIndexEntrySync(
1253 : prevKey, key InternalKey, bhp block.HandleWithProperties, tmp []byte,
1254 2 : ) error {
1255 2 : return w.addIndexEntrySep(w.indexEntrySep(prevKey, key, w.dataBlockBuf), bhp, tmp)
1256 2 : }
1257 :
1258 : func (w *RawRowWriter) addIndexEntrySep(
1259 : sep InternalKey, bhp block.HandleWithProperties, tmp []byte,
1260 2 : ) error {
1261 2 : shouldFlush := supportsTwoLevelIndex(w.tableFormat) &&
1262 2 : w.indexBlock.shouldFlush(sep, encodedBHPEstimatedSize, &w.indexFlush)
1263 2 : var flushableIndexBlock *indexBlockBuf
1264 2 : var props []byte
1265 2 : var err error
1266 2 : if shouldFlush {
1267 2 : flushableIndexBlock = w.indexBlock
1268 2 : w.indexBlock = newIndexBlockBuf(w.coordination.parallelismEnabled)
1269 2 : w.twoLevelIndex = true
1270 2 : // Call BlockPropertyCollector.FinishIndexBlock, since we've decided to
1271 2 : // flush the index block.
1272 2 : props, err = w.finishIndexBlockProps()
1273 2 : if err != nil {
1274 0 : return err
1275 0 : }
1276 : }
1277 :
1278 2 : err = w.addIndexEntry(sep, bhp, tmp, flushableIndexBlock, w.indexBlock, 0, props)
1279 2 : if flushableIndexBlock != nil {
1280 2 : flushableIndexBlock.clear()
1281 2 : indexBlockBufPool.Put(flushableIndexBlock)
1282 2 : }
1283 2 : w.addPrevDataBlockToIndexBlockProps()
1284 2 : return err
1285 : }
1286 :
1287 : func shouldFlush(
1288 : keyLen, valueLen int,
1289 : restartInterval, estimatedBlockSize, numEntries int,
1290 : flushGovernor *block.FlushGovernor,
1291 2 : ) bool {
1292 2 : if numEntries == 0 {
1293 2 : return false
1294 2 : }
1295 2 : if estimatedBlockSize < flushGovernor.LowWatermark() {
1296 2 : // Fast path when the block is too small to flush.
1297 2 : return false
1298 2 : }
1299 :
1300 : // Estimate the new size. This could be an overestimation because we don't
1301 : // know how much of the key will be shared.
1302 2 : newSize := estimatedBlockSize + keyLen + valueLen
1303 2 : if numEntries%restartInterval == 0 {
1304 2 : newSize += 4
1305 2 : }
1306 2 : newSize += 4 // varint for shared prefix length
1307 2 : newSize += uvarintLen(uint32(keyLen)) // varint for unshared key bytes
1308 2 : newSize += uvarintLen(uint32(valueLen)) // varint for value size
1309 2 :
1310 2 : return flushGovernor.ShouldFlush(estimatedBlockSize, newSize)
1311 : }
1312 :
1313 2 : func cloneKeyWithBuf(k InternalKey, a bytealloc.A) (bytealloc.A, InternalKey) {
1314 2 : if len(k.UserKey) == 0 {
1315 0 : return a, k
1316 0 : }
1317 2 : a, keyCopy := a.Copy(k.UserKey)
1318 2 : return a, InternalKey{UserKey: keyCopy, Trailer: k.Trailer}
1319 : }
1320 :
1321 : // Invariants: The byte slice returned by finishIndexBlockProps is heap-allocated
1322 : //
1323 : // and has its own lifetime, independent of the Writer and the blockPropsEncoder,
1324 : //
1325 : // and it is safe to:
1326 : // 1. Reuse w.blockPropsEncoder without first encoding the byte slice returned.
1327 : // 2. Store the byte slice in the Writer since it is a copy and not supported by
1328 : // an underlying buffer.
1329 2 : func (w *RawRowWriter) finishIndexBlockProps() ([]byte, error) {
1330 2 : w.blockPropsEncoder.resetProps()
1331 2 : for i := range w.blockPropCollectors {
1332 2 : scratch := w.blockPropsEncoder.getScratchForProp()
1333 2 : var err error
1334 2 : if scratch, err = w.blockPropCollectors[i].FinishIndexBlock(scratch); err != nil {
1335 1 : return nil, err
1336 1 : }
1337 2 : w.blockPropsEncoder.addProp(shortID(i), scratch)
1338 : }
1339 2 : return w.blockPropsEncoder.props(), nil
1340 : }
1341 :
1342 : // finishIndexBlock finishes the current index block and adds it to the top
1343 : // level index block. This is only used when two level indexes are enabled.
1344 : //
1345 : // Invariants:
1346 : // 1. The props slice passed into finishedIndexBlock must not be a
1347 : // owned by any other struct, since it will be stored in the Writer.indexPartitions
1348 : // slice.
1349 : // 2. None of the buffers owned by indexBuf will be shallow copied and stored elsewhere.
1350 : // That is, it must be safe to reuse indexBuf after finishIndexBlock has been called.
1351 2 : func (w *RawRowWriter) finishIndexBlock(indexBuf *indexBlockBuf, props []byte) error {
1352 2 : part := bufferedIndexBlock{
1353 2 : nEntries: indexBuf.block.EntryCount(), properties: props,
1354 2 : }
1355 2 : w.indexSepAlloc, part.sep = cloneKeyWithBuf(
1356 2 : indexBuf.block.CurKey(), w.indexSepAlloc,
1357 2 : )
1358 2 : bk := indexBuf.finish()
1359 2 : if len(w.indexBlockAlloc) < len(bk) {
1360 2 : // Allocate enough bytes for approximately 16 index blocks.
1361 2 : w.indexBlockAlloc = make([]byte, len(bk)*16)
1362 2 : }
1363 2 : n := copy(w.indexBlockAlloc, bk)
1364 2 : part.block = w.indexBlockAlloc[:n:n]
1365 2 : w.indexBlockAlloc = w.indexBlockAlloc[n:]
1366 2 : w.indexPartitions = append(w.indexPartitions, part)
1367 2 : return nil
1368 : }
1369 :
1370 2 : func (w *RawRowWriter) writeTwoLevelIndex() (block.Handle, error) {
1371 2 : props, err := w.finishIndexBlockProps()
1372 2 : if err != nil {
1373 0 : return block.Handle{}, err
1374 0 : }
1375 : // Add the final unfinished index.
1376 2 : if err = w.finishIndexBlock(w.indexBlock, props); err != nil {
1377 0 : return block.Handle{}, err
1378 0 : }
1379 :
1380 2 : for i := range w.indexPartitions {
1381 2 : b := &w.indexPartitions[i]
1382 2 : w.props.NumDataBlocks += uint64(b.nEntries)
1383 2 :
1384 2 : data := b.block
1385 2 : w.props.IndexSize += uint64(len(data))
1386 2 : bh, err := w.layout.WriteIndexBlock(data)
1387 2 : if err != nil {
1388 0 : return block.Handle{}, err
1389 0 : }
1390 2 : w.topLevelIndexBlock.Add(b.sep, block.HandleWithProperties{
1391 2 : Handle: bh,
1392 2 : Props: b.properties,
1393 2 : }.EncodeVarints(w.blockBuf.tmp[:]))
1394 : }
1395 :
1396 : // NB: RocksDB includes the block trailer length in the index size
1397 : // property, though it doesn't include the trailer in the top level
1398 : // index size property.
1399 2 : w.props.IndexPartitions = uint64(len(w.indexPartitions))
1400 2 : w.props.TopLevelIndexSize = uint64(w.topLevelIndexBlock.EstimatedSize())
1401 2 : w.props.IndexSize += w.props.TopLevelIndexSize + block.TrailerLen
1402 2 : return w.layout.WriteIndexBlock(w.topLevelIndexBlock.Finish())
1403 : }
1404 :
1405 : // assertFormatCompatibility ensures that the features present on the table are
1406 : // compatible with the table format version.
1407 2 : func (w *RawRowWriter) assertFormatCompatibility() error {
1408 2 : // PebbleDBv1: block properties.
1409 2 : if len(w.blockPropCollectors) > 0 && w.tableFormat < TableFormatPebblev1 {
1410 1 : return errors.Newf(
1411 1 : "table format version %s is less than the minimum required version %s for block properties",
1412 1 : w.tableFormat, TableFormatPebblev1,
1413 1 : )
1414 1 : }
1415 :
1416 : // PebbleDBv2: range keys.
1417 2 : if w.props.NumRangeKeys() > 0 && w.tableFormat < TableFormatPebblev2 {
1418 1 : return errors.Newf(
1419 1 : "table format version %s is less than the minimum required version %s for range keys",
1420 1 : w.tableFormat, TableFormatPebblev2,
1421 1 : )
1422 1 : }
1423 :
1424 : // PebbleDBv3: value blocks.
1425 2 : if (w.props.NumValueBlocks > 0 || w.props.NumValuesInValueBlocks > 0 ||
1426 2 : w.props.ValueBlocksSize > 0) && w.tableFormat < TableFormatPebblev3 {
1427 0 : return errors.Newf(
1428 0 : "table format version %s is less than the minimum required version %s for value blocks",
1429 0 : w.tableFormat, TableFormatPebblev3)
1430 0 : }
1431 :
1432 : // PebbleDBv4: DELSIZED tombstones.
1433 2 : if w.props.NumSizedDeletions > 0 && w.tableFormat < TableFormatPebblev4 {
1434 0 : return errors.Newf(
1435 0 : "table format version %s is less than the minimum required version %s for sized deletion tombstones",
1436 0 : w.tableFormat, TableFormatPebblev4)
1437 0 : }
1438 2 : return nil
1439 : }
1440 :
1441 : // ComparePrev compares the provided user to the last point key written to the
1442 : // writer. The returned value is equivalent to Compare(key, prevKey) where
1443 : // prevKey is the last point key written to the writer.
1444 : //
1445 : // If no key has been written yet, ComparePrev returns +1.
1446 : //
1447 : // Must not be called after Writer is closed.
1448 2 : func (w *RawRowWriter) ComparePrev(k []byte) int {
1449 2 : if w == nil || w.dataBlockBuf.dataBlock.EntryCount() == 0 {
1450 1 : return +1
1451 1 : }
1452 2 : return w.compare(k, w.dataBlockBuf.dataBlock.CurUserKey())
1453 : }
1454 :
1455 : // EncodeSpan encodes the keys in the given span. The span can contain either
1456 : // only RANGEDEL keys or only range keys.
1457 : //
1458 : // This is a low-level API that bypasses the fragmenter. The spans passed to
1459 : // this function must be fragmented and ordered.
1460 2 : func (w *RawRowWriter) EncodeSpan(span keyspan.Span) error {
1461 2 : if span.Empty() {
1462 2 : return nil
1463 2 : }
1464 2 : if span.Keys[0].Kind() == base.InternalKeyKindRangeDelete {
1465 2 : return rangedel.Encode(span, w.addTombstone)
1466 2 : }
1467 2 : for i := range w.blockPropCollectors {
1468 2 : if err := w.blockPropCollectors[i].AddRangeKeys(span); err != nil {
1469 0 : return err
1470 0 : }
1471 : }
1472 2 : return w.rangeKeyEncoder.Encode(span)
1473 : }
1474 :
1475 : // Error returns the current accumulated error, if any.
1476 2 : func (w *RawRowWriter) Error() error {
1477 2 : return w.err
1478 2 : }
1479 :
1480 : // Close finishes writing the table and closes the underlying file that the
1481 : // table was written to.
1482 2 : func (w *RawRowWriter) Close() (err error) {
1483 2 : defer func() {
1484 2 : if w.valueBlockWriter != nil {
1485 2 : w.valueBlockWriter.Release()
1486 2 : // Defensive code in case Close gets called again. We don't want to put
1487 2 : // the same object to a sync.Pool.
1488 2 : w.valueBlockWriter = nil
1489 2 : }
1490 2 : w.layout.Abort()
1491 2 : // Record any error in the writer (so we can exit early if Close is called
1492 2 : // again).
1493 2 : if err != nil {
1494 1 : w.err = err
1495 1 : }
1496 : }()
1497 :
1498 : // finish must be called before we check for an error, because finish will
1499 : // block until every single task added to the writeQueue has been processed,
1500 : // and an error could be encountered while any of those tasks are processed.
1501 2 : if err := w.coordination.writeQueue.finish(); err != nil {
1502 1 : return err
1503 1 : }
1504 2 : if w.err != nil {
1505 1 : return w.err
1506 1 : }
1507 :
1508 : // The w.meta.LargestPointKey is only used once the Writer is closed, so it is safe to set it
1509 : // when the Writer is closed.
1510 : //
1511 : // The following invariants ensure that setting the largest key at this point of a Writer close
1512 : // is correct:
1513 : // 1. Keys must only be added to the Writer in an increasing order.
1514 : // 2. The current w.dataBlockBuf is guaranteed to have the latest key added to the Writer. This
1515 : // must be true, because a w.dataBlockBuf is only switched out when a dataBlock is flushed,
1516 : // however, if a dataBlock is flushed, then we add a key to the new w.dataBlockBuf in the
1517 : // addPoint function after the flush occurs.
1518 2 : if w.dataBlockBuf.dataBlock.EntryCount() >= 1 {
1519 2 : w.meta.SetLargestPointKey(w.dataBlockBuf.dataBlock.CurKey().Clone())
1520 2 : }
1521 :
1522 : // Finish the last data block, or force an empty data block if there
1523 : // aren't any data blocks at all.
1524 2 : if w.dataBlockBuf.dataBlock.EntryCount() > 0 || w.indexBlock.block.EntryCount() == 0 {
1525 2 : w.dataBlockBuf.finish()
1526 2 : w.maybeIncrementTombstoneDenseBlocks()
1527 2 : bh, err := w.layout.WriteDataBlock(w.dataBlockBuf.uncompressed, &w.dataBlockBuf.blockBuf)
1528 2 : if err != nil {
1529 0 : return err
1530 0 : }
1531 2 : bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
1532 2 : if err != nil {
1533 0 : return err
1534 0 : }
1535 2 : prevKey := w.dataBlockBuf.dataBlock.CurKey()
1536 2 : if err := w.addIndexEntrySync(prevKey, InternalKey{}, bhp, w.dataBlockBuf.tmp[:]); err != nil {
1537 0 : return err
1538 0 : }
1539 : }
1540 2 : w.props.DataSize = w.layout.offset
1541 2 :
1542 2 : // Write the filter block.
1543 2 : if w.filter != nil {
1544 2 : bh, err := w.layout.WriteFilterBlock(w.filter)
1545 2 : if err != nil {
1546 0 : return err
1547 0 : }
1548 2 : w.props.FilterPolicyName = w.filter.policyName()
1549 2 : w.props.FilterSize = bh.Length
1550 : }
1551 :
1552 2 : if w.twoLevelIndex {
1553 2 : w.props.IndexType = twoLevelIndex
1554 2 : // Write the two level index block.
1555 2 : if _, err = w.writeTwoLevelIndex(); err != nil {
1556 0 : return err
1557 0 : }
1558 2 : } else {
1559 2 : w.props.IndexType = binarySearchIndex
1560 2 : // NB: RocksDB includes the block trailer length in the index size
1561 2 : // property, though it doesn't include the trailer in the filter size
1562 2 : // property.
1563 2 : w.props.IndexSize = uint64(w.indexBlock.estimatedSize()) + block.TrailerLen
1564 2 : w.props.NumDataBlocks = uint64(w.indexBlock.block.EntryCount())
1565 2 : // Write the single level index block.
1566 2 : if _, err = w.layout.WriteIndexBlock(w.indexBlock.finish()); err != nil {
1567 0 : return err
1568 0 : }
1569 : }
1570 :
1571 : // Write the range-del block.
1572 2 : if w.props.NumRangeDeletions > 0 {
1573 2 : // Because the range tombstones are fragmented, the end key of the last
1574 2 : // added range tombstone will be the largest range tombstone key. Note
1575 2 : // that we need to make this into a range deletion sentinel because
1576 2 : // sstable boundaries are inclusive while the end key of a range
1577 2 : // deletion tombstone is exclusive. A Clone() is necessary as
1578 2 : // rangeDelBlock.curValue is the same slice that will get passed into
1579 2 : // w.writer, and some implementations of vfs.File mutate the slice
1580 2 : // passed into Write(). Also, w.meta will often outlive the blockWriter,
1581 2 : // and so cloning curValue allows the rangeDelBlock's internal buffer to
1582 2 : // get gc'd.
1583 2 : k := base.MakeRangeDeleteSentinelKey(w.rangeDelBlock.CurValue()).Clone()
1584 2 : w.meta.SetLargestRangeDelKey(k)
1585 2 : if _, err := w.layout.WriteRangeDeletionBlock(w.rangeDelBlock.Finish()); err != nil {
1586 0 : return err
1587 0 : }
1588 : }
1589 :
1590 2 : if w.props.NumRangeKeys() > 0 {
1591 2 : key := w.rangeKeyBlock.CurKey()
1592 2 : kind := key.Kind()
1593 2 : endKey, _, err := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.CurValue())
1594 2 : if err != nil {
1595 0 : return err
1596 0 : }
1597 2 : k := base.MakeExclusiveSentinelKey(kind, endKey).Clone()
1598 2 : w.meta.SetLargestRangeKey(k)
1599 2 : if _, err := w.layout.WriteRangeKeyBlock(w.rangeKeyBlock.Finish()); err != nil {
1600 0 : return err
1601 0 : }
1602 : }
1603 :
1604 2 : if w.valueBlockWriter != nil {
1605 2 : _, vbStats, err := w.valueBlockWriter.Finish(&w.layout, w.layout.offset)
1606 2 : if err != nil {
1607 0 : return err
1608 0 : }
1609 2 : w.props.NumValueBlocks = vbStats.NumValueBlocks
1610 2 : w.props.NumValuesInValueBlocks = vbStats.NumValuesInValueBlocks
1611 2 : w.props.ValueBlocksSize = vbStats.ValueBlocksAndIndexSize
1612 : }
1613 :
1614 2 : {
1615 2 : // Finish and record the prop collectors if props are not yet recorded.
1616 2 : // Pre-computed props might have been copied by specialized sst creators
1617 2 : // like suffix replacer.
1618 2 : if len(w.props.UserProperties) == 0 {
1619 2 : userProps := make(map[string]string)
1620 2 : for i := range w.blockPropCollectors {
1621 2 : scratch := w.blockPropsEncoder.getScratchForProp()
1622 2 : // Place the shortID in the first byte.
1623 2 : scratch = append(scratch, byte(i))
1624 2 : buf, err := w.blockPropCollectors[i].FinishTable(scratch)
1625 2 : if err != nil {
1626 1 : return err
1627 1 : }
1628 2 : var prop string
1629 2 : if len(buf) > 0 {
1630 2 : prop = string(buf)
1631 2 : }
1632 : // NB: The property is populated in the map even if it is the
1633 : // empty string, since the presence in the map is what indicates
1634 : // that the block property collector was used when writing.
1635 2 : userProps[w.blockPropCollectors[i].Name()] = prop
1636 : }
1637 2 : if len(userProps) > 0 {
1638 2 : w.props.UserProperties = userProps
1639 2 : }
1640 : }
1641 :
1642 : // Write the properties block.
1643 2 : var raw rowblk.Writer
1644 2 : // The restart interval is set to infinity because the properties block
1645 2 : // is always read sequentially and cached in a heap located object. This
1646 2 : // reduces table size without a significant impact on performance.
1647 2 : raw.RestartInterval = propertiesBlockRestartInterval
1648 2 : w.props.CompressionOptions = rocksDBCompressionOptions
1649 2 : w.props.save(w.tableFormat, &raw)
1650 2 : if _, err := w.layout.WritePropertiesBlock(raw.Finish()); err != nil {
1651 0 : return err
1652 0 : }
1653 : }
1654 :
1655 : // Write the table footer.
1656 2 : w.meta.Size, err = w.layout.Finish()
1657 2 : if err != nil {
1658 1 : return err
1659 1 : }
1660 2 : w.meta.Properties = w.props
1661 2 :
1662 2 : // Check that the features present in the table are compatible with the format
1663 2 : // configured for the table.
1664 2 : if err = w.assertFormatCompatibility(); err != nil {
1665 1 : return err
1666 1 : }
1667 :
1668 2 : w.dataBlockBuf.clear()
1669 2 : dataBlockBufPool.Put(w.dataBlockBuf)
1670 2 : w.dataBlockBuf = nil
1671 2 : w.indexBlock.clear()
1672 2 : indexBlockBufPool.Put(w.indexBlock)
1673 2 : w.indexBlock = nil
1674 2 :
1675 2 : // Make any future calls to Set or Close return an error.
1676 2 : w.err = errWriterClosed
1677 2 : return nil
1678 : }
1679 :
1680 : // EstimatedSize returns the estimated size of the sstable being written if a
1681 : // call to Finish() was made without adding additional keys.
1682 2 : func (w *RawRowWriter) EstimatedSize() uint64 {
1683 2 : if w == nil {
1684 0 : return 0
1685 0 : }
1686 2 : return w.coordination.sizeEstimate.size() +
1687 2 : uint64(w.dataBlockBuf.dataBlock.EstimatedSize()) +
1688 2 : w.indexBlock.estimatedSize()
1689 : }
1690 :
1691 : // Metadata returns the metadata for the finished sstable. Only valid to call
1692 : // after the sstable has been finished.
1693 2 : func (w *RawRowWriter) Metadata() (*WriterMetadata, error) {
1694 2 : if !w.layout.IsFinished() {
1695 0 : return nil, errors.New("pebble: writer is not closed")
1696 0 : }
1697 2 : return &w.meta, nil
1698 : }
1699 :
1700 2 : func newRowWriter(writable objstorage.Writable, o WriterOptions) *RawRowWriter {
1701 2 : if o.TableFormat.BlockColumnar() {
1702 0 : panic(errors.AssertionFailedf("newRowWriter cannot create sstables with %s format", o.TableFormat))
1703 : }
1704 2 : o = o.ensureDefaults()
1705 2 : w := &RawRowWriter{
1706 2 : layout: makeLayoutWriter(writable, o),
1707 2 : meta: WriterMetadata{
1708 2 : SmallestSeqNum: math.MaxUint64,
1709 2 : },
1710 2 : compare: o.Comparer.Compare,
1711 2 : pointSuffixCmp: o.Comparer.ComparePointSuffixes,
1712 2 : split: o.Comparer.Split,
1713 2 : formatKey: o.Comparer.FormatKey,
1714 2 : compression: o.Compression,
1715 2 : separator: o.Comparer.Separator,
1716 2 : successor: o.Comparer.Successor,
1717 2 : tableFormat: o.TableFormat,
1718 2 : isStrictObsolete: o.IsStrictObsolete,
1719 2 : writingToLowestLevel: o.WritingToLowestLevel,
1720 2 : restartInterval: o.BlockRestartInterval,
1721 2 : checksumType: o.Checksum,
1722 2 : disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
1723 2 : indexBlock: newIndexBlockBuf(o.Parallelism),
1724 2 : rangeDelBlock: rowblk.Writer{RestartInterval: 1},
1725 2 : rangeKeyBlock: rowblk.Writer{RestartInterval: 1},
1726 2 : topLevelIndexBlock: rowblk.Writer{RestartInterval: 1},
1727 2 : allocatorSizeClasses: o.AllocatorSizeClasses,
1728 2 : numDeletionsThreshold: o.NumDeletionsThreshold,
1729 2 : deletionSizeRatioThreshold: o.DeletionSizeRatioThreshold,
1730 2 : }
1731 2 : w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
1732 2 : w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
1733 2 : if w.tableFormat >= TableFormatPebblev3 {
1734 2 : w.shortAttributeExtractor = o.ShortAttributeExtractor
1735 2 : w.requiredInPlaceValueBound = o.RequiredInPlaceValueBound
1736 2 : if !o.DisableValueBlocks {
1737 2 : w.valueBlockWriter = valblk.NewWriter(
1738 2 : block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
1739 2 : w.compression, w.checksumType, func(compressedSize int) {
1740 2 : w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0)
1741 2 : },
1742 : )
1743 : }
1744 : }
1745 :
1746 2 : w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
1747 2 :
1748 2 : w.blockBuf = blockBuf{
1749 2 : checksummer: block.Checksummer{Type: o.Checksum},
1750 2 : }
1751 2 :
1752 2 : w.coordination.init(o.Parallelism, w)
1753 2 : defer func() {
1754 2 : if r := recover(); r != nil {
1755 1 : // Don't leak a goroutine if we hit a panic.
1756 1 : _ = w.coordination.writeQueue.finish()
1757 1 : panic(r)
1758 : }
1759 : }()
1760 :
1761 2 : if writable == nil {
1762 0 : w.err = errors.New("pebble: nil writable")
1763 0 : return w
1764 0 : }
1765 :
1766 2 : if o.FilterPolicy != nil {
1767 2 : switch o.FilterType {
1768 2 : case TableFilter:
1769 2 : w.filter = newTableFilterWriter(o.FilterPolicy)
1770 0 : default:
1771 0 : panic(fmt.Sprintf("unknown filter type: %v", o.FilterType))
1772 : }
1773 : }
1774 :
1775 2 : w.props.ComparerName = o.Comparer.Name
1776 2 : w.props.CompressionName = o.Compression.String()
1777 2 : w.props.MergerName = o.MergerName
1778 2 : w.props.PropertyCollectorNames = "[]"
1779 2 :
1780 2 : numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
1781 2 : shouldAddObsoleteCollector := w.tableFormat >= TableFormatPebblev4 && !o.disableObsoleteCollector
1782 2 : if shouldAddObsoleteCollector {
1783 2 : numBlockPropertyCollectors++
1784 2 : }
1785 :
1786 2 : if numBlockPropertyCollectors > 0 {
1787 2 : if numBlockPropertyCollectors > maxPropertyCollectors {
1788 0 : w.err = errors.New("pebble: too many block property collectors")
1789 0 : return w
1790 0 : }
1791 2 : w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
1792 2 : for _, constructFn := range o.BlockPropertyCollectors {
1793 2 : w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
1794 2 : }
1795 2 : if shouldAddObsoleteCollector {
1796 2 : w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
1797 2 : }
1798 :
1799 2 : var buf bytes.Buffer
1800 2 : buf.WriteString("[")
1801 2 : for i := range w.blockPropCollectors {
1802 2 : if i > 0 {
1803 2 : buf.WriteString(",")
1804 2 : }
1805 2 : buf.WriteString(w.blockPropCollectors[i].Name())
1806 : }
1807 2 : buf.WriteString("]")
1808 2 : w.props.PropertyCollectorNames = buf.String()
1809 : }
1810 :
1811 : // Initialize the range key fragmenter and encoder.
1812 2 : w.rangeKeyEncoder.Emit = w.addRangeKey
1813 2 : return w
1814 : }
1815 :
1816 : // rewriteSuffixes implements RawWriter.
1817 : func (w *RawRowWriter) rewriteSuffixes(
1818 : r *Reader, wo WriterOptions, from, to []byte, concurrency int,
1819 1 : ) error {
1820 1 : for _, c := range w.blockPropCollectors {
1821 1 : if !c.SupportsSuffixReplacement() {
1822 0 : return errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
1823 0 : }
1824 : }
1825 1 : l, err := r.Layout()
1826 1 : if err != nil {
1827 0 : return errors.Wrap(err, "reading layout")
1828 0 : }
1829 :
1830 : // Copy data blocks in parallel, rewriting suffixes as we go.
1831 1 : blocks, err := rewriteDataBlocksInParallel(r, wo, l.Data, from, to, concurrency, func() blockRewriter {
1832 1 : return rowblk.NewRewriter(r.Comparer, wo.BlockRestartInterval)
1833 1 : })
1834 1 : if err != nil {
1835 1 : return errors.Wrap(err, "rewriting data blocks")
1836 1 : }
1837 :
1838 : // oldShortIDs maps the shortID for the block property collector in the old
1839 : // blocks to the shortID in the new blocks. Initialized once for the sstable.
1840 1 : oldShortIDs, n, err := getShortIDs(r, w.blockPropCollectors)
1841 1 : if err != nil {
1842 0 : return errors.Wrap(err, "getting short IDs")
1843 0 : }
1844 1 : oldProps := make([][]byte, len(w.blockPropCollectors))
1845 1 :
1846 1 : for i := range blocks {
1847 1 : // Write the rewritten block to the file.
1848 1 : bh, err := w.layout.WritePrecompressedDataBlock(blocks[i].physical)
1849 1 : if err != nil {
1850 0 : return err
1851 0 : }
1852 :
1853 : // Load any previous values for our prop collectors into oldProps.
1854 1 : for i := range oldProps {
1855 1 : oldProps[i] = nil
1856 1 : }
1857 1 : decoder := makeBlockPropertiesDecoder(n, l.Data[i].Props)
1858 1 : for !decoder.Done() {
1859 1 : id, val, err := decoder.Next()
1860 1 : if err != nil {
1861 0 : return err
1862 0 : }
1863 1 : if oldShortIDs[id].IsValid() {
1864 1 : oldProps[oldShortIDs[id]] = val
1865 1 : }
1866 : }
1867 1 : for i, p := range w.blockPropCollectors {
1868 1 : if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil {
1869 0 : return err
1870 0 : }
1871 : }
1872 :
1873 1 : bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
1874 1 : if err != nil {
1875 0 : return err
1876 0 : }
1877 1 : var nextKey InternalKey
1878 1 : if i+1 < len(blocks) {
1879 1 : nextKey = blocks[i+1].start
1880 1 : }
1881 1 : if err = w.addIndexEntrySync(blocks[i].end, nextKey, bhp, w.dataBlockBuf.tmp[:]); err != nil {
1882 0 : return err
1883 0 : }
1884 : }
1885 1 : if len(blocks) > 0 {
1886 1 : w.meta.Size = w.layout.offset
1887 1 : w.meta.updateSeqNum(blocks[0].start.SeqNum())
1888 1 : w.props.NumEntries = r.Properties.NumEntries
1889 1 : w.props.RawKeySize = r.Properties.RawKeySize
1890 1 : w.props.RawValueSize = r.Properties.RawValueSize
1891 1 : w.meta.SetSmallestPointKey(blocks[0].start)
1892 1 : w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
1893 1 : }
1894 :
1895 : // Copy range key block, replacing suffixes if it exists.
1896 1 : if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
1897 0 : return errors.Wrap(err, "rewriting range key blocks")
1898 0 : }
1899 : // Copy over the filter block if it exists (rewriteDataBlocksToWriter will
1900 : // already have ensured this is valid if it exists).
1901 1 : if w.filter != nil {
1902 1 : if filterBlockBH, ok := l.FilterByName(w.filter.metaName()); ok {
1903 1 : filterBlock, _, err := readBlockBuf(r, filterBlockBH, nil)
1904 1 : if err != nil {
1905 0 : return errors.Wrap(err, "reading filter")
1906 0 : }
1907 1 : w.filter = copyFilterWriter{
1908 1 : origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBlock,
1909 1 : }
1910 : }
1911 : }
1912 1 : return nil
1913 : }
1914 :
1915 : // copyDataBlocks implements RawWriter.
1916 : func (w *RawRowWriter) copyDataBlocks(
1917 : ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
1918 2 : ) error {
1919 2 : blockOffset := blocks[0].bh.Offset
1920 2 : // The block lengths don't include their trailers, which just sit after the
1921 2 : // block length, before the next offset; We get the ones between the blocks
1922 2 : // we copy implicitly but need to explicitly add the last trailer to length.
1923 2 : length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + block.TrailerLen - blockOffset
1924 2 : if spanEnd := length + blockOffset; spanEnd < blockOffset {
1925 0 : return base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", blockOffset, spanEnd)
1926 0 : }
1927 2 : if err := objstorage.Copy(ctx, rh, w.layout.writable, blockOffset, length); err != nil {
1928 0 : return err
1929 0 : }
1930 : // Update w.meta.Size so subsequently flushed metadata has correct offsets.
1931 2 : w.meta.Size += length
1932 2 : for i := range blocks {
1933 2 : blocks[i].bh.Offset = w.layout.offset
1934 2 : // blocks[i].bh.Length remains unmodified.
1935 2 : sepKey := base.MakeInternalKey(blocks[i].sep, base.SeqNumMax, base.InternalKeyKindSeparator)
1936 2 : if err := w.addIndexEntrySep(sepKey, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
1937 0 : return err
1938 0 : }
1939 2 : w.layout.offset += uint64(blocks[i].bh.Length) + block.TrailerLen
1940 : }
1941 2 : return nil
1942 : }
1943 :
1944 : // addDataBlock implements RawWriter.
1945 1 : func (w *RawRowWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
1946 1 : // layout.WriteDataBlock keeps layout.offset up-to-date for us.
1947 1 : bh, err := w.layout.WriteDataBlock(b, &w.dataBlockBuf.blockBuf)
1948 1 : if err != nil {
1949 0 : return err
1950 0 : }
1951 1 : bhp.Handle = bh
1952 1 :
1953 1 : sepKey := base.MakeInternalKey(sep, base.SeqNumMax, base.InternalKeyKindSeparator)
1954 1 : if err := w.addIndexEntrySep(sepKey, bhp, w.dataBlockBuf.tmp[:]); err != nil {
1955 0 : return err
1956 0 : }
1957 1 : w.meta.Size += uint64(bh.Length) + block.TrailerLen
1958 1 : return nil
1959 : }
1960 :
1961 : // copyProperties implements RawWriter.
1962 2 : func (w *RawRowWriter) copyProperties(props Properties) {
1963 2 : w.props = props
1964 2 : // Remove all user properties to disable block properties, which we do not
1965 2 : // calculate.
1966 2 : w.props.UserProperties = nil
1967 2 : // Reset props that we'll re-derive as we build our own index.
1968 2 : w.props.IndexPartitions = 0
1969 2 : w.props.TopLevelIndexSize = 0
1970 2 : w.props.IndexSize = 0
1971 2 : w.props.IndexType = 0
1972 2 : }
1973 :
1974 : // copyFilter implements RawWriter.
1975 1 : func (w *RawRowWriter) copyFilter(filter []byte, filterName string) error {
1976 1 : if w.filter != nil && filterName != w.filter.policyName() {
1977 0 : return errors.New("mismatched filters")
1978 0 : }
1979 1 : w.filter = copyFilterWriter{
1980 1 : origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filter,
1981 1 : }
1982 1 : return nil
1983 : }
1984 :
1985 : // SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
1986 : // be used internally by Pebble.
1987 : func (w *RawRowWriter) SetSnapshotPinnedProperties(
1988 : pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
1989 2 : ) {
1990 2 : w.props.SnapshotPinnedKeys = pinnedKeyCount
1991 2 : w.props.SnapshotPinnedKeySize = pinnedKeySize
1992 2 : w.props.SnapshotPinnedValueSize = pinnedValueSize
1993 2 : }
|