Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "os"
15 : "slices"
16 : "time"
17 :
18 : "github.com/cespare/xxhash/v2"
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/pebble/internal/base"
21 : "github.com/cockroachdb/pebble/internal/bytealloc"
22 : "github.com/cockroachdb/pebble/internal/cache"
23 : "github.com/cockroachdb/pebble/internal/crc"
24 : "github.com/cockroachdb/pebble/internal/invariants"
25 : "github.com/cockroachdb/pebble/internal/keyspan"
26 : "github.com/cockroachdb/pebble/internal/private"
27 : "github.com/cockroachdb/pebble/objstorage"
28 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
29 : )
30 :
31 : var errReaderClosed = errors.New("pebble/table: reader is closed")
32 :
33 : // decodeBlockHandle returns the block handle encoded at the start of src, as
34 : // well as the number of bytes it occupies. It returns zero if given invalid
35 : // input. A block handle for a data block or a first/lower level index block
36 : // should not be decoded using decodeBlockHandle since the caller may validate
37 : // that the number of bytes decoded is equal to the length of src, which will
38 : // be false if the properties are not decoded. In those cases the caller
39 : // should use decodeBlockHandleWithProperties.
40 1 : func decodeBlockHandle(src []byte) (BlockHandle, int) {
41 1 : offset, n := binary.Uvarint(src)
42 1 : length, m := binary.Uvarint(src[n:])
43 1 : if n == 0 || m == 0 {
44 0 : return BlockHandle{}, 0
45 0 : }
46 1 : return BlockHandle{offset, length}, n + m
47 : }
48 :
49 : // decodeBlockHandleWithProperties returns the block handle and properties
50 : // encoded in src. src needs to be exactly the length that was encoded. This
51 : // method must be used for data block and first/lower level index blocks. The
52 : // properties in the block handle point to the bytes in src.
53 1 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
54 1 : bh, n := decodeBlockHandle(src)
55 1 : if n == 0 {
56 0 : return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
57 0 : }
58 1 : return BlockHandleWithProperties{
59 1 : BlockHandle: bh,
60 1 : Props: src[n:],
61 1 : }, nil
62 : }
63 :
64 1 : func encodeBlockHandle(dst []byte, b BlockHandle) int {
65 1 : n := binary.PutUvarint(dst, b.Offset)
66 1 : m := binary.PutUvarint(dst[n:], b.Length)
67 1 : return n + m
68 1 : }
69 :
70 1 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
71 1 : n := encodeBlockHandle(dst, b.BlockHandle)
72 1 : dst = append(dst[:n], b.Props...)
73 1 : return dst
74 1 : }
75 :
76 : // block is a []byte that holds a sequence of key/value pairs plus an index
77 : // over those pairs.
78 : type block []byte
79 :
80 : type loadBlockResult int8
81 :
82 : const (
83 : loadBlockOK loadBlockResult = iota
84 : // Could be due to error or because no block left to load.
85 : loadBlockFailed
86 : loadBlockIrrelevant
87 : )
88 :
89 : type blockTransform func([]byte) ([]byte, error)
90 :
91 : // ReaderOption provide an interface to do work on Reader while it is being
92 : // opened.
93 : type ReaderOption interface {
94 : // readerApply is called on the reader during opening in order to set internal
95 : // parameters.
96 : readerApply(*Reader)
97 : }
98 :
99 : // Comparers is a map from comparer name to comparer. It is used for debugging
100 : // tools which may be used on multiple databases configured with different
101 : // comparers. Comparers implements the OpenOption interface and can be passed
102 : // as a parameter to NewReader.
103 : type Comparers map[string]*Comparer
104 :
105 1 : func (c Comparers) readerApply(r *Reader) {
106 1 : if r.Compare != nil || r.Properties.ComparerName == "" {
107 1 : return
108 1 : }
109 1 : if comparer, ok := c[r.Properties.ComparerName]; ok {
110 1 : r.Compare = comparer.Compare
111 1 : r.Equal = comparer.Equal
112 1 : r.FormatKey = comparer.FormatKey
113 1 : r.Split = comparer.Split
114 1 : }
115 : }
116 :
117 : // Mergers is a map from merger name to merger. It is used for debugging tools
118 : // which may be used on multiple databases configured with different
119 : // mergers. Mergers implements the OpenOption interface and can be passed as
120 : // a parameter to NewReader.
121 : type Mergers map[string]*Merger
122 :
123 1 : func (m Mergers) readerApply(r *Reader) {
124 1 : if r.mergerOK || r.Properties.MergerName == "" {
125 1 : return
126 1 : }
127 1 : _, r.mergerOK = m[r.Properties.MergerName]
128 : }
129 :
130 : // cacheOpts is a Reader open option for specifying the cache ID and sstable file
131 : // number. If not specified, a unique cache ID will be used.
132 : type cacheOpts struct {
133 : cacheID uint64
134 : fileNum base.DiskFileNum
135 : }
136 :
137 : // Marker function to indicate the option should be applied before reading the
138 : // sstable properties and, in the write path, before writing the default
139 : // sstable properties.
140 0 : func (c *cacheOpts) preApply() {}
141 :
142 1 : func (c *cacheOpts) readerApply(r *Reader) {
143 1 : if r.cacheID == 0 {
144 1 : r.cacheID = c.cacheID
145 1 : }
146 1 : if r.fileNum == 0 {
147 1 : r.fileNum = c.fileNum
148 1 : }
149 : }
150 :
151 1 : func (c *cacheOpts) writerApply(w *Writer) {
152 1 : if w.cacheID == 0 {
153 1 : w.cacheID = c.cacheID
154 1 : }
155 1 : if w.fileNum == 0 {
156 1 : w.fileNum = c.fileNum
157 1 : }
158 : }
159 :
160 : // SyntheticSuffix will replace every suffix of every key surfaced during block
161 : // iteration. A synthetic suffix can be used if:
162 : // 1. no two keys in the sst share the same prefix; and
163 : // 2. pebble.Compare(prefix + replacementSuffix, prefix + originalSuffix) < 0,
164 : // for all keys in the backing sst which have a suffix (i.e. originalSuffix
165 : // is not empty).
166 : type SyntheticSuffix []byte
167 :
168 : // IsSet returns true if the synthetic suffix is not enpty.
169 1 : func (ss SyntheticSuffix) IsSet() bool {
170 1 : return len(ss) > 0
171 1 : }
172 :
173 : // SyntheticPrefix represents a byte slice that is implicitly prepended to every
174 : // key in a file being read or accessed by a reader. Note that the table is
175 : // assumed to contain "prefix-less" keys that become full keys when prepended
176 : // with the synthetic prefix. The table's bloom filters are constructed only on
177 : // the "prefix-less" keys in the table, but interactions with the file including
178 : // seeks and reads, will all behave as if the file had been constructed from
179 : // keys that did include the prefix. Note that all Compare operations may act on
180 : // a prefix-less key as the synthetic prefix will never modify key metadata
181 : // stored in the key suffix.
182 : //
183 : // NB: Since this transformation currently only applies to point keys, a block
184 : // with range keys cannot be iterated over with a synthetic prefix.
185 : type SyntheticPrefix []byte
186 :
187 : // IsSet returns true if the synthetic prefix is not enpty.
188 1 : func (sp SyntheticPrefix) IsSet() bool {
189 1 : return len(sp) > 0
190 1 : }
191 :
192 : // Apply prepends the synthetic prefix to a key.
193 1 : func (sp SyntheticPrefix) Apply(key []byte) []byte {
194 1 : res := make([]byte, 0, len(sp)+len(key))
195 1 : res = append(res, sp...)
196 1 : res = append(res, key...)
197 1 : return res
198 1 : }
199 :
200 : // Invert removes the synthetic prefix from a key.
201 1 : func (sp SyntheticPrefix) Invert(key []byte) []byte {
202 1 : res, ok := bytes.CutPrefix(key, sp)
203 1 : if !ok {
204 0 : panic(fmt.Sprintf("unexpected prefix: %s", key))
205 : }
206 1 : return res
207 : }
208 :
209 : // rawTombstonesOpt is a Reader open option for specifying that range
210 : // tombstones returned by Reader.NewRangeDelIter() should not be
211 : // fragmented. Used by debug tools to get a raw view of the tombstones
212 : // contained in an sstable.
213 : type rawTombstonesOpt struct{}
214 :
215 0 : func (rawTombstonesOpt) preApply() {}
216 :
217 1 : func (rawTombstonesOpt) readerApply(r *Reader) {
218 1 : r.rawTombstones = true
219 1 : }
220 :
221 1 : func init() {
222 1 : private.SSTableCacheOpts = func(cacheID uint64, fileNum base.DiskFileNum) interface{} {
223 1 : return &cacheOpts{cacheID, fileNum}
224 1 : }
225 1 : private.SSTableRawTombstonesOpt = rawTombstonesOpt{}
226 : }
227 :
228 : // Reader is a table reader.
229 : type Reader struct {
230 : readable objstorage.Readable
231 : cacheID uint64
232 : fileNum base.DiskFileNum
233 : err error
234 : indexBH BlockHandle
235 : filterBH BlockHandle
236 : rangeDelBH BlockHandle
237 : rangeKeyBH BlockHandle
238 : rangeDelTransform blockTransform
239 : valueBIH valueBlocksIndexHandle
240 : propertiesBH BlockHandle
241 : metaIndexBH BlockHandle
242 : footerBH BlockHandle
243 : opts ReaderOptions
244 : Compare Compare
245 : Equal Equal
246 : FormatKey base.FormatKey
247 : Split Split
248 : tableFilter *tableFilterReader
249 : // Keep types that are not multiples of 8 bytes at the end and with
250 : // decreasing size.
251 : Properties Properties
252 : tableFormat TableFormat
253 : rawTombstones bool
254 : mergerOK bool
255 : checksumType ChecksumType
256 : // metaBufferPool is a buffer pool used exclusively when opening a table and
257 : // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
258 : // the BufferPool.pool slice as a part of the Reader allocation. It's
259 : // capacity 3 to accommodate the meta block (1), and both the compressed
260 : // properties block (1) and decompressed properties block (1)
261 : // simultaneously.
262 : metaBufferPool BufferPool
263 : metaBufferPoolAlloc [3]allocedBuffer
264 : }
265 :
266 : var _ CommonReader = (*Reader)(nil)
267 :
268 : // Close implements DB.Close, as documented in the pebble package.
269 1 : func (r *Reader) Close() error {
270 1 : r.opts.Cache.Unref()
271 1 :
272 1 : if r.readable != nil {
273 1 : r.err = firstError(r.err, r.readable.Close())
274 1 : r.readable = nil
275 1 : }
276 :
277 1 : if r.err != nil {
278 1 : return r.err
279 1 : }
280 : // Make any future calls to Get, NewIter or Close return an error.
281 1 : r.err = errReaderClosed
282 1 : return nil
283 : }
284 :
285 : // NewIterWithBlockPropertyFilters returns an iterator for the contents of the
286 : // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after
287 : // itself and returns a nil iterator.
288 : func (r *Reader) NewIterWithBlockPropertyFilters(
289 : transforms IterTransforms,
290 : lower, upper []byte,
291 : filterer *BlockPropertiesFilterer,
292 : useFilterBlock bool,
293 : stats *base.InternalIteratorStats,
294 : categoryAndQoS CategoryAndQoS,
295 : statsCollector *CategoryStatsCollector,
296 : rp ReaderProvider,
297 1 : ) (Iterator, error) {
298 1 : return r.newIterWithBlockPropertyFiltersAndContext(
299 1 : context.Background(), transforms, lower, upper, filterer, useFilterBlock,
300 1 : stats, categoryAndQoS, statsCollector, rp, nil)
301 1 : }
302 :
303 : // NewIterWithBlockPropertyFiltersAndContextEtc is similar to
304 : // NewIterWithBlockPropertyFilters and additionally accepts a context for
305 : // tracing.
306 : //
307 : // If transform.HideObsoletePoints is set, the callee assumes that filterer
308 : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
309 : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
310 : func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc(
311 : ctx context.Context,
312 : transforms IterTransforms,
313 : lower, upper []byte,
314 : filterer *BlockPropertiesFilterer,
315 : useFilterBlock bool,
316 : stats *base.InternalIteratorStats,
317 : categoryAndQoS CategoryAndQoS,
318 : statsCollector *CategoryStatsCollector,
319 : rp ReaderProvider,
320 1 : ) (Iterator, error) {
321 1 : return r.newIterWithBlockPropertyFiltersAndContext(
322 1 : ctx, transforms, lower, upper, filterer, useFilterBlock,
323 1 : stats, categoryAndQoS, statsCollector, rp, nil)
324 1 : }
325 :
326 : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
327 : // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the
328 : // value of hideObsoletePoints and potentially add a block property filter.
329 : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
330 : snapshotForHideObsoletePoints uint64,
331 : fileLargestSeqNum uint64,
332 : pointKeyFilters []BlockPropertyFilter,
333 1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
334 1 : hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
335 1 : snapshotForHideObsoletePoints > fileLargestSeqNum
336 1 : if hideObsoletePoints {
337 1 : pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
338 1 : }
339 1 : return hideObsoletePoints, pointKeyFilters
340 : }
341 :
342 : func (r *Reader) newIterWithBlockPropertyFiltersAndContext(
343 : ctx context.Context,
344 : transforms IterTransforms,
345 : lower, upper []byte,
346 : filterer *BlockPropertiesFilterer,
347 : useFilterBlock bool,
348 : stats *base.InternalIteratorStats,
349 : categoryAndQoS CategoryAndQoS,
350 : statsCollector *CategoryStatsCollector,
351 : rp ReaderProvider,
352 : vState *virtualState,
353 1 : ) (Iterator, error) {
354 1 : // NB: pebble.tableCache wraps the returned iterator with one which performs
355 1 : // reference counting on the Reader, preventing the Reader from being closed
356 1 : // until the final iterator closes.
357 1 : if r.Properties.IndexType == twoLevelIndex {
358 1 : i := twoLevelIterPool.Get().(*twoLevelIterator)
359 1 : err := i.init(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
360 1 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
361 1 : if err != nil {
362 1 : return nil, err
363 1 : }
364 1 : return i, nil
365 : }
366 :
367 1 : i := singleLevelIterPool.Get().(*singleLevelIterator)
368 1 : err := i.init(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
369 1 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
370 1 : if err != nil {
371 1 : return nil, err
372 1 : }
373 1 : return i, nil
374 : }
375 :
376 : // NewIter returns an iterator for the contents of the table. If an error
377 : // occurs, NewIter cleans up after itself and returns a nil iterator. NewIter
378 : // must only be used when the Reader is guaranteed to outlive any LazyValues
379 : // returned from the iter.
380 1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
381 1 : return r.NewIterWithBlockPropertyFilters(
382 1 : transforms, lower, upper, nil, true, /* useFilterBlock */
383 1 : nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, TrivialReaderProvider{Reader: r})
384 1 : }
385 :
386 : // NewCompactionIter returns an iterator similar to NewIter but it also increments
387 : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
388 : // after itself and returns a nil iterator.
389 : func (r *Reader) NewCompactionIter(
390 : transforms IterTransforms,
391 : bytesIterated *uint64,
392 : categoryAndQoS CategoryAndQoS,
393 : statsCollector *CategoryStatsCollector,
394 : rp ReaderProvider,
395 : bufferPool *BufferPool,
396 1 : ) (Iterator, error) {
397 1 : return r.newCompactionIter(transforms, bytesIterated, categoryAndQoS, statsCollector, rp, nil, bufferPool)
398 1 : }
399 :
400 : func (r *Reader) newCompactionIter(
401 : transforms IterTransforms,
402 : bytesIterated *uint64,
403 : categoryAndQoS CategoryAndQoS,
404 : statsCollector *CategoryStatsCollector,
405 : rp ReaderProvider,
406 : vState *virtualState,
407 : bufferPool *BufferPool,
408 1 : ) (Iterator, error) {
409 1 : if vState != nil && vState.isSharedIngested {
410 1 : transforms.HideObsoletePoints = true
411 1 : }
412 1 : if r.Properties.IndexType == twoLevelIndex {
413 1 : i := twoLevelIterPool.Get().(*twoLevelIterator)
414 1 : err := i.init(
415 1 : context.Background(),
416 1 : r, vState, transforms, nil /* lower */, nil /* upper */, nil,
417 1 : false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
418 1 : )
419 1 : if err != nil {
420 0 : return nil, err
421 0 : }
422 1 : i.setupForCompaction()
423 1 : return &twoLevelCompactionIterator{
424 1 : twoLevelIterator: i,
425 1 : bytesIterated: bytesIterated,
426 1 : }, nil
427 : }
428 1 : i := singleLevelIterPool.Get().(*singleLevelIterator)
429 1 : err := i.init(
430 1 : context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
431 1 : nil, false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
432 1 : )
433 1 : if err != nil {
434 0 : return nil, err
435 0 : }
436 1 : i.setupForCompaction()
437 1 : return &compactionIterator{
438 1 : singleLevelIterator: i,
439 1 : bytesIterated: bytesIterated,
440 1 : }, nil
441 : }
442 :
443 : // NewRawRangeDelIter returns an internal iterator for the contents of the
444 : // range-del block for the table. Returns nil if the table does not contain
445 : // any range deletions.
446 : //
447 : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
448 : // iterator. Add WithContext methods since the existing ones are public.
449 1 : func (r *Reader) NewRawRangeDelIter(transforms IterTransforms) (keyspan.FragmentIterator, error) {
450 1 : if r.rangeDelBH.Length == 0 {
451 1 : return nil, nil
452 1 : }
453 1 : if transforms.SyntheticSuffix.IsSet() {
454 0 : return nil, errors.AssertionFailedf("synthetic suffix not supported with range del iterator")
455 0 : }
456 1 : if transforms.SyntheticPrefix.IsSet() {
457 0 : return nil, errors.AssertionFailedf("synthetic prefix not supported with range del iterator")
458 0 : }
459 1 : h, err := r.readRangeDel(nil /* stats */, nil /* iterStats */)
460 1 : if err != nil {
461 1 : return nil, err
462 1 : }
463 1 : i := &fragmentBlockIter{elideSameSeqnum: true}
464 1 : // It's okay for hideObsoletePoints to be false here, even for shared ingested
465 1 : // sstables. This is because rangedels do not apply to points in the same
466 1 : // sstable at the same sequence number anyway, so exposing obsolete rangedels
467 1 : // is harmless.
468 1 : if err := i.blockIter.initHandle(r.Compare, r.Split, h, transforms); err != nil {
469 0 : return nil, err
470 0 : }
471 1 : return i, nil
472 : }
473 :
474 : // NewRawRangeKeyIter returns an internal iterator for the contents of the
475 : // range-key block for the table. Returns nil if the table does not contain any
476 : // range keys.
477 : //
478 : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
479 : // iterator. Add WithContext methods since the existing ones are public.
480 1 : func (r *Reader) NewRawRangeKeyIter(transforms IterTransforms) (keyspan.FragmentIterator, error) {
481 1 : if r.rangeKeyBH.Length == 0 {
482 1 : return nil, nil
483 1 : }
484 1 : if transforms.SyntheticSuffix.IsSet() {
485 0 : return nil, errors.AssertionFailedf("synthetic suffix not supported with range key iterator")
486 0 : }
487 1 : if transforms.SyntheticPrefix.IsSet() {
488 0 : return nil, errors.AssertionFailedf("synthetic prefix not supported with range key iterator")
489 0 : }
490 1 : h, err := r.readRangeKey(nil /* stats */, nil /* iterStats */)
491 1 : if err != nil {
492 1 : return nil, err
493 1 : }
494 1 : i := rangeKeyFragmentBlockIterPool.Get().(*rangeKeyFragmentBlockIter)
495 1 :
496 1 : if err := i.blockIter.initHandle(r.Compare, r.Split, h, transforms); err != nil {
497 0 : return nil, err
498 0 : }
499 1 : return i, nil
500 : }
501 :
502 : type rangeKeyFragmentBlockIter struct {
503 : fragmentBlockIter
504 : }
505 :
506 1 : func (i *rangeKeyFragmentBlockIter) Close() error {
507 1 : err := i.fragmentBlockIter.Close()
508 1 : i.fragmentBlockIter = i.fragmentBlockIter.resetForReuse()
509 1 : rangeKeyFragmentBlockIterPool.Put(i)
510 1 : return err
511 1 : }
512 :
513 : func (r *Reader) readIndex(
514 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
515 1 : ) (bufferHandle, error) {
516 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
517 1 : return r.readBlock(ctx, r.indexBH, nil, nil, stats, iterStats, nil /* buffer pool */)
518 1 : }
519 :
520 : func (r *Reader) readFilter(
521 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
522 1 : ) (bufferHandle, error) {
523 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
524 1 : return r.readBlock(ctx, r.filterBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
525 1 : }
526 :
527 : func (r *Reader) readRangeDel(
528 : stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
529 1 : ) (bufferHandle, error) {
530 1 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
531 1 : return r.readBlock(ctx, r.rangeDelBH, r.rangeDelTransform, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
532 1 : }
533 :
534 : func (r *Reader) readRangeKey(
535 : stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
536 1 : ) (bufferHandle, error) {
537 1 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
538 1 : return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
539 1 : }
540 :
541 : func checkChecksum(
542 : checksumType ChecksumType, b []byte, bh BlockHandle, fileNum base.DiskFileNum,
543 1 : ) error {
544 1 : expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
545 1 : var computedChecksum uint32
546 1 : switch checksumType {
547 1 : case ChecksumTypeCRC32c:
548 1 : computedChecksum = crc.New(b[:bh.Length+1]).Value()
549 1 : case ChecksumTypeXXHash64:
550 1 : computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
551 0 : default:
552 0 : return errors.Errorf("unsupported checksum type: %d", checksumType)
553 : }
554 :
555 1 : if expectedChecksum != computedChecksum {
556 1 : return base.CorruptionErrorf(
557 1 : "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
558 1 : fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
559 1 : }
560 1 : return nil
561 : }
562 :
563 : type cacheValueOrBuf struct {
564 : // buf.Valid() returns true if backed by a BufferPool.
565 : buf Buf
566 : // v is non-nil if backed by the block cache.
567 : v *cache.Value
568 : }
569 :
570 1 : func (b cacheValueOrBuf) get() []byte {
571 1 : if b.buf.Valid() {
572 1 : return b.buf.p.pool[b.buf.i].b
573 1 : }
574 1 : return b.v.Buf()
575 : }
576 :
577 1 : func (b cacheValueOrBuf) release() {
578 1 : if b.buf.Valid() {
579 1 : b.buf.Release()
580 1 : } else {
581 1 : cache.Free(b.v)
582 1 : }
583 : }
584 :
585 1 : func (b cacheValueOrBuf) truncate(n int) {
586 1 : if b.buf.Valid() {
587 1 : b.buf.p.pool[b.buf.i].b = b.buf.p.pool[b.buf.i].b[:n]
588 1 : } else {
589 1 : b.v.Truncate(n)
590 1 : }
591 : }
592 :
593 : // DeterministicReadBlockDurationForTesting is for tests that want a
594 : // deterministic value of the time to read a block (that is not in the cache).
595 : // The return value is a function that must be called before the test exits.
596 1 : func DeterministicReadBlockDurationForTesting() func() {
597 1 : drbdForTesting := deterministicReadBlockDurationForTesting
598 1 : deterministicReadBlockDurationForTesting = true
599 1 : return func() {
600 1 : deterministicReadBlockDurationForTesting = drbdForTesting
601 1 : }
602 : }
603 :
604 : var deterministicReadBlockDurationForTesting = false
605 :
606 : func (r *Reader) readBlock(
607 : ctx context.Context,
608 : bh BlockHandle,
609 : transform blockTransform,
610 : readHandle objstorage.ReadHandle,
611 : stats *base.InternalIteratorStats,
612 : iterStats *iterStatsAccumulator,
613 : bufferPool *BufferPool,
614 1 : ) (handle bufferHandle, _ error) {
615 1 : if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
616 1 : // Cache hit.
617 1 : if readHandle != nil {
618 1 : readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+blockTrailerLen))
619 1 : }
620 1 : if stats != nil {
621 1 : stats.BlockBytes += bh.Length
622 1 : stats.BlockBytesInCache += bh.Length
623 1 : }
624 1 : if iterStats != nil {
625 1 : iterStats.reportStats(bh.Length, bh.Length, 0)
626 1 : }
627 : // This block is already in the cache; return a handle to existing vlaue
628 : // in the cache.
629 1 : return bufferHandle{h: h}, nil
630 : }
631 :
632 : // Cache miss.
633 1 : var compressed cacheValueOrBuf
634 1 : if bufferPool != nil {
635 1 : compressed = cacheValueOrBuf{
636 1 : buf: bufferPool.Alloc(int(bh.Length + blockTrailerLen)),
637 1 : }
638 1 : } else {
639 1 : compressed = cacheValueOrBuf{
640 1 : v: cache.Alloc(int(bh.Length + blockTrailerLen)),
641 1 : }
642 1 : }
643 :
644 1 : readStartTime := time.Now()
645 1 : var err error
646 1 : if readHandle != nil {
647 1 : err = readHandle.ReadAt(ctx, compressed.get(), int64(bh.Offset))
648 1 : } else {
649 1 : err = r.readable.ReadAt(ctx, compressed.get(), int64(bh.Offset))
650 1 : }
651 1 : readDuration := time.Since(readStartTime)
652 1 : // TODO(sumeer): should the threshold be configurable.
653 1 : const slowReadTracingThreshold = 5 * time.Millisecond
654 1 : // For deterministic testing.
655 1 : if deterministicReadBlockDurationForTesting {
656 1 : readDuration = slowReadTracingThreshold
657 1 : }
658 : // Call IsTracingEnabled to avoid the allocations of boxing integers into an
659 : // interface{}, unless necessary.
660 1 : if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) {
661 1 : r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s",
662 1 : int(bh.Length+blockTrailerLen), readDuration.String())
663 1 : }
664 1 : if stats != nil {
665 1 : stats.BlockReadDuration += readDuration
666 1 : }
667 1 : if err != nil {
668 1 : compressed.release()
669 1 : return bufferHandle{}, err
670 1 : }
671 1 : if err := checkChecksum(r.checksumType, compressed.get(), bh, r.fileNum); err != nil {
672 1 : compressed.release()
673 1 : return bufferHandle{}, err
674 1 : }
675 :
676 1 : typ := blockType(compressed.get()[bh.Length])
677 1 : compressed.truncate(int(bh.Length))
678 1 :
679 1 : var decompressed cacheValueOrBuf
680 1 : if typ == noCompressionBlockType {
681 1 : decompressed = compressed
682 1 : } else {
683 1 : // Decode the length of the decompressed value.
684 1 : decodedLen, prefixLen, err := decompressedLen(typ, compressed.get())
685 1 : if err != nil {
686 0 : compressed.release()
687 0 : return bufferHandle{}, err
688 0 : }
689 :
690 1 : if bufferPool != nil {
691 1 : decompressed = cacheValueOrBuf{buf: bufferPool.Alloc(decodedLen)}
692 1 : } else {
693 1 : decompressed = cacheValueOrBuf{v: cache.Alloc(decodedLen)}
694 1 : }
695 1 : if err := decompressInto(typ, compressed.get()[prefixLen:], decompressed.get()); err != nil {
696 0 : compressed.release()
697 0 : return bufferHandle{}, err
698 0 : }
699 1 : compressed.release()
700 : }
701 :
702 1 : if transform != nil {
703 1 : // Transforming blocks is very rare, so the extra copy of the
704 1 : // transformed data is not problematic.
705 1 : tmpTransformed, err := transform(decompressed.get())
706 1 : if err != nil {
707 0 : decompressed.release()
708 0 : return bufferHandle{}, err
709 0 : }
710 :
711 1 : var transformed cacheValueOrBuf
712 1 : if bufferPool != nil {
713 0 : transformed = cacheValueOrBuf{buf: bufferPool.Alloc(len(tmpTransformed))}
714 1 : } else {
715 1 : transformed = cacheValueOrBuf{v: cache.Alloc(len(tmpTransformed))}
716 1 : }
717 1 : copy(transformed.get(), tmpTransformed)
718 1 : decompressed.release()
719 1 : decompressed = transformed
720 : }
721 :
722 1 : if stats != nil {
723 1 : stats.BlockBytes += bh.Length
724 1 : }
725 1 : if iterStats != nil {
726 1 : iterStats.reportStats(bh.Length, 0, readDuration)
727 1 : }
728 1 : if decompressed.buf.Valid() {
729 1 : return bufferHandle{b: decompressed.buf}, nil
730 1 : }
731 1 : h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, decompressed.v)
732 1 : return bufferHandle{h: h}, nil
733 : }
734 :
735 1 : func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) {
736 1 : // Convert v1 (RocksDB format) range-del blocks to v2 blocks on the fly. The
737 1 : // v1 format range-del blocks have unfragmented and unsorted range
738 1 : // tombstones. We need properly fragmented and sorted range tombstones in
739 1 : // order to serve from them directly.
740 1 : iter := &blockIter{}
741 1 : if err := iter.init(r.Compare, r.Split, b, NoTransforms); err != nil {
742 0 : return nil, err
743 0 : }
744 1 : var tombstones []keyspan.Span
745 1 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
746 1 : t := keyspan.Span{
747 1 : Start: key.UserKey,
748 1 : End: value.InPlaceValue(),
749 1 : Keys: []keyspan.Key{{Trailer: key.Trailer}},
750 1 : }
751 1 : tombstones = append(tombstones, t)
752 1 : }
753 1 : keyspan.Sort(r.Compare, tombstones)
754 1 :
755 1 : // Fragment the tombstones, outputting them directly to a block writer.
756 1 : rangeDelBlock := blockWriter{
757 1 : restartInterval: 1,
758 1 : }
759 1 : frag := keyspan.Fragmenter{
760 1 : Cmp: r.Compare,
761 1 : Format: r.FormatKey,
762 1 : Emit: func(s keyspan.Span) {
763 1 : for _, k := range s.Keys {
764 1 : startIK := InternalKey{UserKey: s.Start, Trailer: k.Trailer}
765 1 : rangeDelBlock.add(startIK, s.End)
766 1 : }
767 : },
768 : }
769 1 : for i := range tombstones {
770 1 : frag.Add(tombstones[i])
771 1 : }
772 1 : frag.Finish()
773 1 :
774 1 : // Return the contents of the constructed v2 format range-del block.
775 1 : return rangeDelBlock.finish(), nil
776 : }
777 :
778 1 : func (r *Reader) readMetaindex(metaindexBH BlockHandle) error {
779 1 : // We use a BufferPool when reading metaindex blocks in order to avoid
780 1 : // populating the block cache with these blocks. In heavy-write workloads,
781 1 : // especially with high compaction concurrency, new tables may be created
782 1 : // frequently. Populating the block cache with these metaindex blocks adds
783 1 : // additional contention on the block cache mutexes (see #1997).
784 1 : // Additionally, these blocks are exceedingly unlikely to be read again
785 1 : // while they're still in the block cache except in misconfigurations with
786 1 : // excessive sstables counts or a table cache that's far too small.
787 1 : r.metaBufferPool.initPreallocated(r.metaBufferPoolAlloc[:0])
788 1 : // When we're finished, release the buffers we've allocated back to memory
789 1 : // allocator. We don't expect to use metaBufferPool again.
790 1 : defer r.metaBufferPool.Release()
791 1 :
792 1 : b, err := r.readBlock(
793 1 : context.Background(), metaindexBH, nil /* transform */, nil /* readHandle */, nil, /* stats */
794 1 : nil /* iterStats */, &r.metaBufferPool)
795 1 : if err != nil {
796 1 : return err
797 1 : }
798 1 : data := b.Get()
799 1 : defer b.Release()
800 1 :
801 1 : if uint64(len(data)) != metaindexBH.Length {
802 0 : return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
803 0 : errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
804 0 : }
805 :
806 1 : i, err := newRawBlockIter(bytes.Compare, data)
807 1 : if err != nil {
808 0 : return err
809 0 : }
810 :
811 1 : meta := map[string]BlockHandle{}
812 1 : for valid := i.First(); valid; valid = i.Next() {
813 1 : value := i.Value()
814 1 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
815 1 : vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
816 1 : if err != nil {
817 0 : return err
818 0 : }
819 1 : if n == 0 || n != len(value) {
820 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
821 0 : }
822 1 : r.valueBIH = vbih
823 1 : } else {
824 1 : bh, n := decodeBlockHandle(value)
825 1 : if n == 0 || n != len(value) {
826 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
827 0 : }
828 1 : meta[string(i.Key().UserKey)] = bh
829 : }
830 : }
831 1 : if err := i.Close(); err != nil {
832 0 : return err
833 0 : }
834 :
835 1 : if bh, ok := meta[metaPropertiesName]; ok {
836 1 : b, err = r.readBlock(
837 1 : context.Background(), bh, nil /* transform */, nil /* readHandle */, nil, /* stats */
838 1 : nil /* iterStats */, nil /* buffer pool */)
839 1 : if err != nil {
840 1 : return err
841 1 : }
842 1 : r.propertiesBH = bh
843 1 : err := r.Properties.load(b.Get(), bh.Offset, r.opts.DeniedUserProperties)
844 1 : b.Release()
845 1 : if err != nil {
846 0 : return err
847 0 : }
848 : }
849 :
850 1 : if bh, ok := meta[metaRangeDelV2Name]; ok {
851 1 : r.rangeDelBH = bh
852 1 : } else if bh, ok := meta[metaRangeDelName]; ok {
853 1 : r.rangeDelBH = bh
854 1 : if !r.rawTombstones {
855 1 : r.rangeDelTransform = r.transformRangeDelV1
856 1 : }
857 : }
858 :
859 1 : if bh, ok := meta[metaRangeKeyName]; ok {
860 1 : r.rangeKeyBH = bh
861 1 : }
862 :
863 1 : for name, fp := range r.opts.Filters {
864 1 : types := []struct {
865 1 : ftype FilterType
866 1 : prefix string
867 1 : }{
868 1 : {TableFilter, "fullfilter."},
869 1 : }
870 1 : var done bool
871 1 : for _, t := range types {
872 1 : if bh, ok := meta[t.prefix+name]; ok {
873 1 : r.filterBH = bh
874 1 :
875 1 : switch t.ftype {
876 1 : case TableFilter:
877 1 : r.tableFilter = newTableFilterReader(fp)
878 0 : default:
879 0 : return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
880 : }
881 :
882 1 : done = true
883 1 : break
884 : }
885 : }
886 1 : if done {
887 1 : break
888 : }
889 : }
890 1 : return nil
891 : }
892 :
893 : // Layout returns the layout (block organization) for an sstable.
894 1 : func (r *Reader) Layout() (*Layout, error) {
895 1 : if r.err != nil {
896 0 : return nil, r.err
897 0 : }
898 :
899 1 : l := &Layout{
900 1 : Data: make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
901 1 : Filter: r.filterBH,
902 1 : RangeDel: r.rangeDelBH,
903 1 : RangeKey: r.rangeKeyBH,
904 1 : ValueIndex: r.valueBIH.h,
905 1 : Properties: r.propertiesBH,
906 1 : MetaIndex: r.metaIndexBH,
907 1 : Footer: r.footerBH,
908 1 : Format: r.tableFormat,
909 1 : }
910 1 :
911 1 : indexH, err := r.readIndex(context.Background(), nil, nil)
912 1 : if err != nil {
913 1 : return nil, err
914 1 : }
915 1 : defer indexH.Release()
916 1 :
917 1 : var alloc bytealloc.A
918 1 :
919 1 : if r.Properties.IndexPartitions == 0 {
920 1 : l.Index = append(l.Index, r.indexBH)
921 1 : iter, _ := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
922 1 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
923 1 : dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
924 1 : if err != nil {
925 0 : return nil, errCorruptIndexEntry(err)
926 0 : }
927 1 : if len(dataBH.Props) > 0 {
928 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
929 1 : }
930 1 : l.Data = append(l.Data, dataBH)
931 : }
932 1 : } else {
933 1 : l.TopIndex = r.indexBH
934 1 : topIter, _ := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
935 1 : iter := &blockIter{}
936 1 : for key, value := topIter.First(); key != nil; key, value = topIter.Next() {
937 1 : indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
938 1 : if err != nil {
939 0 : return nil, errCorruptIndexEntry(err)
940 0 : }
941 1 : l.Index = append(l.Index, indexBH.BlockHandle)
942 1 :
943 1 : subIndex, err := r.readBlock(context.Background(), indexBH.BlockHandle,
944 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
945 1 : if err != nil {
946 1 : return nil, err
947 1 : }
948 : // TODO(msbutler): figure out how to pass virtualState to layout call.
949 1 : if err := iter.init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
950 0 : return nil, err
951 0 : }
952 1 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
953 1 : dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
954 1 : if len(dataBH.Props) > 0 {
955 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
956 1 : }
957 1 : if err != nil {
958 0 : return nil, errCorruptIndexEntry(err)
959 0 : }
960 1 : l.Data = append(l.Data, dataBH)
961 : }
962 1 : subIndex.Release()
963 1 : *iter = iter.resetForReuse()
964 : }
965 : }
966 1 : if r.valueBIH.h.Length != 0 {
967 1 : vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
968 1 : if err != nil {
969 0 : return nil, err
970 0 : }
971 1 : defer vbiH.Release()
972 1 : vbiBlock := vbiH.Get()
973 1 : indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
974 1 : r.valueBIH.blockLengthByteLength)
975 1 : i := 0
976 1 : for len(vbiBlock) != 0 {
977 1 : if len(vbiBlock) < indexEntryLen {
978 0 : return nil, errors.Errorf(
979 0 : "remaining value index block %d does not contain a full entry of length %d",
980 0 : len(vbiBlock), indexEntryLen)
981 0 : }
982 1 : n := int(r.valueBIH.blockNumByteLength)
983 1 : bn := int(littleEndianGet(vbiBlock, n))
984 1 : if bn != i {
985 0 : return nil, errors.Errorf("unexpected block num %d, expected %d",
986 0 : bn, i)
987 0 : }
988 1 : i++
989 1 : vbiBlock = vbiBlock[n:]
990 1 : n = int(r.valueBIH.blockOffsetByteLength)
991 1 : blockOffset := littleEndianGet(vbiBlock, n)
992 1 : vbiBlock = vbiBlock[n:]
993 1 : n = int(r.valueBIH.blockLengthByteLength)
994 1 : blockLen := littleEndianGet(vbiBlock, n)
995 1 : vbiBlock = vbiBlock[n:]
996 1 : l.ValueBlock = append(l.ValueBlock, BlockHandle{Offset: blockOffset, Length: blockLen})
997 : }
998 : }
999 :
1000 1 : return l, nil
1001 : }
1002 :
1003 : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
1004 1 : func (r *Reader) ValidateBlockChecksums() error {
1005 1 : // Pre-compute the BlockHandles for the underlying file.
1006 1 : l, err := r.Layout()
1007 1 : if err != nil {
1008 1 : return err
1009 1 : }
1010 :
1011 : // Construct the set of blocks to check. Note that the footer is not checked
1012 : // as it is not a block with a checksum.
1013 1 : blocks := make([]BlockHandle, len(l.Data))
1014 1 : for i := range l.Data {
1015 1 : blocks[i] = l.Data[i].BlockHandle
1016 1 : }
1017 1 : blocks = append(blocks, l.Index...)
1018 1 : blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
1019 1 :
1020 1 : // Sorting by offset ensures we are performing a sequential scan of the
1021 1 : // file.
1022 1 : slices.SortFunc(blocks, func(a, b BlockHandle) int {
1023 1 : return cmp.Compare(a.Offset, b.Offset)
1024 1 : })
1025 :
1026 : // Check all blocks sequentially. Make use of read-ahead, given we are
1027 : // scanning the entire file from start to end.
1028 1 : rh := r.readable.NewReadHandle(context.TODO())
1029 1 : defer rh.Close()
1030 1 :
1031 1 : for _, bh := range blocks {
1032 1 : // Certain blocks may not be present, in which case we skip them.
1033 1 : if bh.Length == 0 {
1034 1 : continue
1035 : }
1036 :
1037 : // Read the block, which validates the checksum.
1038 1 : h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
1039 1 : if err != nil {
1040 1 : return err
1041 1 : }
1042 1 : h.Release()
1043 : }
1044 :
1045 1 : return nil
1046 : }
1047 :
1048 : // CommonProperties implemented the CommonReader interface.
1049 1 : func (r *Reader) CommonProperties() *CommonProperties {
1050 1 : return &r.Properties.CommonProperties
1051 1 : }
1052 :
1053 : // EstimateDiskUsage returns the total size of data blocks overlapping the range
1054 : // `[start, end]`. Even if a data block partially overlaps, or we cannot
1055 : // determine overlap due to abbreviated index keys, the full data block size is
1056 : // included in the estimation.
1057 : //
1058 : // This function does not account for any metablock space usage. Assumes there
1059 : // is at least partial overlap, i.e., `[start, end]` falls neither completely
1060 : // before nor completely after the file's range.
1061 : //
1062 : // Only blocks containing point keys are considered. Range deletion and range
1063 : // key blocks are not considered.
1064 : //
1065 : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
1066 : // data blocks overlapped and add that same fraction of the metadata blocks to the
1067 : // estimate.
1068 1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
1069 1 : if r.err != nil {
1070 0 : return 0, r.err
1071 0 : }
1072 :
1073 1 : indexH, err := r.readIndex(context.Background(), nil, nil)
1074 1 : if err != nil {
1075 1 : return 0, err
1076 1 : }
1077 1 : defer indexH.Release()
1078 1 :
1079 1 : // Iterators over the bottom-level index blocks containing start and end.
1080 1 : // These may be different in case of partitioned index but will both point
1081 1 : // to the same blockIter over the single index in the unpartitioned case.
1082 1 : var startIdxIter, endIdxIter *blockIter
1083 1 : if r.Properties.IndexPartitions == 0 {
1084 1 : iter, err := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
1085 1 : if err != nil {
1086 0 : return 0, err
1087 0 : }
1088 1 : startIdxIter = iter
1089 1 : endIdxIter = iter
1090 1 : } else {
1091 1 : topIter, err := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
1092 1 : if err != nil {
1093 0 : return 0, err
1094 0 : }
1095 :
1096 1 : key, val := topIter.SeekGE(start, base.SeekGEFlagsNone)
1097 1 : if key == nil {
1098 0 : // The range falls completely after this file, or an error occurred.
1099 0 : return 0, topIter.Error()
1100 0 : }
1101 1 : startIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1102 1 : if err != nil {
1103 0 : return 0, errCorruptIndexEntry(err)
1104 0 : }
1105 1 : startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.BlockHandle,
1106 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
1107 1 : if err != nil {
1108 1 : return 0, err
1109 1 : }
1110 1 : defer startIdxBlock.Release()
1111 1 : startIdxIter, err = newBlockIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
1112 1 : if err != nil {
1113 0 : return 0, err
1114 0 : }
1115 :
1116 1 : key, val = topIter.SeekGE(end, base.SeekGEFlagsNone)
1117 1 : if key == nil {
1118 0 : if err := topIter.Error(); err != nil {
1119 0 : return 0, err
1120 0 : }
1121 1 : } else {
1122 1 : endIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1123 1 : if err != nil {
1124 0 : return 0, errCorruptIndexEntry(err)
1125 0 : }
1126 1 : endIdxBlock, err := r.readBlock(context.Background(),
1127 1 : endIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
1128 1 : if err != nil {
1129 1 : return 0, err
1130 1 : }
1131 1 : defer endIdxBlock.Release()
1132 1 : endIdxIter, err = newBlockIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
1133 1 : if err != nil {
1134 0 : return 0, err
1135 0 : }
1136 : }
1137 : }
1138 : // startIdxIter should not be nil at this point, while endIdxIter can be if the
1139 : // range spans past the end of the file.
1140 :
1141 1 : key, val := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
1142 1 : if key == nil {
1143 1 : // The range falls completely after this file, or an error occurred.
1144 1 : return 0, startIdxIter.Error()
1145 1 : }
1146 1 : startBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1147 1 : if err != nil {
1148 0 : return 0, errCorruptIndexEntry(err)
1149 0 : }
1150 :
1151 1 : includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
1152 1 : // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
1153 1 : // Linearly interpolate what is stored in value blocks.
1154 1 : //
1155 1 : // TODO(sumeer): if we need more accuracy, without loading any data blocks
1156 1 : // (which contain the value handles, and which may also be insufficient if
1157 1 : // the values are in separate files), we will need to accumulate the
1158 1 : // logical size of the key-value pairs and store the cumulative value for
1159 1 : // each data block in the index block entry. This increases the size of
1160 1 : // the BlockHandle, so wait until this becomes necessary.
1161 1 : return dataBlockSize +
1162 1 : uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
1163 1 : float64(r.Properties.ValueBlocksSize))
1164 1 : }
1165 1 : if endIdxIter == nil {
1166 0 : // The range spans beyond this file. Include data blocks through the last.
1167 0 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1168 0 : }
1169 1 : key, val = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
1170 1 : if key == nil {
1171 1 : if err := endIdxIter.Error(); err != nil {
1172 0 : return 0, err
1173 0 : }
1174 : // The range spans beyond this file. Include data blocks through the last.
1175 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1176 : }
1177 1 : endBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1178 1 : if err != nil {
1179 0 : return 0, errCorruptIndexEntry(err)
1180 0 : }
1181 1 : return includeInterpolatedValueBlocksSize(
1182 1 : endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset), nil
1183 : }
1184 :
1185 : // TableFormat returns the format version for the table.
1186 1 : func (r *Reader) TableFormat() (TableFormat, error) {
1187 1 : if r.err != nil {
1188 0 : return TableFormatUnspecified, r.err
1189 0 : }
1190 1 : return r.tableFormat, nil
1191 : }
1192 :
1193 : // NewReader returns a new table reader for the file. Closing the reader will
1194 : // close the file.
1195 1 : func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) {
1196 1 : o = o.ensureDefaults()
1197 1 : r := &Reader{
1198 1 : readable: f,
1199 1 : opts: o,
1200 1 : }
1201 1 : if r.opts.Cache == nil {
1202 1 : r.opts.Cache = cache.New(0)
1203 1 : } else {
1204 1 : r.opts.Cache.Ref()
1205 1 : }
1206 :
1207 1 : if f == nil {
1208 1 : r.err = errors.New("pebble/table: nil file")
1209 1 : return nil, r.Close()
1210 1 : }
1211 :
1212 : // Note that the extra options are applied twice. First here for pre-apply
1213 : // options, and then below for post-apply options. Pre and post refer to
1214 : // before and after reading the metaindex and properties.
1215 1 : type preApply interface{ preApply() }
1216 1 : for _, opt := range extraOpts {
1217 1 : if _, ok := opt.(preApply); ok {
1218 1 : opt.readerApply(r)
1219 1 : }
1220 : }
1221 1 : if r.cacheID == 0 {
1222 1 : r.cacheID = r.opts.Cache.NewID()
1223 1 : }
1224 :
1225 1 : footer, err := readFooter(f)
1226 1 : if err != nil {
1227 1 : r.err = err
1228 1 : return nil, r.Close()
1229 1 : }
1230 1 : r.checksumType = footer.checksum
1231 1 : r.tableFormat = footer.format
1232 1 : // Read the metaindex.
1233 1 : if err := r.readMetaindex(footer.metaindexBH); err != nil {
1234 1 : r.err = err
1235 1 : return nil, r.Close()
1236 1 : }
1237 1 : r.indexBH = footer.indexBH
1238 1 : r.metaIndexBH = footer.metaindexBH
1239 1 : r.footerBH = footer.footerBH
1240 1 :
1241 1 : if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
1242 1 : r.Compare = o.Comparer.Compare
1243 1 : r.Equal = o.Comparer.Equal
1244 1 : r.FormatKey = o.Comparer.FormatKey
1245 1 : r.Split = o.Comparer.Split
1246 1 : }
1247 :
1248 1 : if o.MergerName == r.Properties.MergerName {
1249 1 : r.mergerOK = true
1250 1 : }
1251 :
1252 : // Apply the extra options again now that the comparer and merger names are
1253 : // known.
1254 1 : for _, opt := range extraOpts {
1255 1 : if _, ok := opt.(preApply); !ok {
1256 1 : opt.readerApply(r)
1257 1 : }
1258 : }
1259 :
1260 1 : if r.Compare == nil {
1261 1 : r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
1262 1 : errors.Safe(r.fileNum), errors.Safe(r.Properties.ComparerName))
1263 1 : }
1264 1 : if !r.mergerOK {
1265 1 : if name := r.Properties.MergerName; name != "" && name != "nullptr" {
1266 1 : r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
1267 1 : errors.Safe(r.fileNum), errors.Safe(r.Properties.MergerName))
1268 1 : }
1269 : }
1270 1 : if r.err != nil {
1271 1 : return nil, r.Close()
1272 1 : }
1273 :
1274 1 : return r, nil
1275 : }
1276 :
1277 : // ReadableFile describes the smallest subset of vfs.File that is required for
1278 : // reading SSTs.
1279 : type ReadableFile interface {
1280 : io.ReaderAt
1281 : io.Closer
1282 : Stat() (os.FileInfo, error)
1283 : }
1284 :
1285 : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
1286 : // implementation (which does not support read-ahead)
1287 1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
1288 1 : info, err := r.Stat()
1289 1 : if err != nil {
1290 1 : return nil, err
1291 1 : }
1292 1 : res := &simpleReadable{
1293 1 : f: r,
1294 1 : size: info.Size(),
1295 1 : }
1296 1 : res.rh = objstorage.MakeNoopReadHandle(res)
1297 1 : return res, nil
1298 : }
1299 :
1300 : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
1301 : type simpleReadable struct {
1302 : f ReadableFile
1303 : size int64
1304 : rh objstorage.NoopReadHandle
1305 : }
1306 :
1307 : var _ objstorage.Readable = (*simpleReadable)(nil)
1308 :
1309 : // ReadAt is part of the objstorage.Readable interface.
1310 1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
1311 1 : n, err := s.f.ReadAt(p, off)
1312 1 : if invariants.Enabled && err == nil && n != len(p) {
1313 0 : panic("short read")
1314 : }
1315 1 : return err
1316 : }
1317 :
1318 : // Close is part of the objstorage.Readable interface.
1319 1 : func (s *simpleReadable) Close() error {
1320 1 : return s.f.Close()
1321 1 : }
1322 :
1323 : // Size is part of the objstorage.Readable interface.
1324 1 : func (s *simpleReadable) Size() int64 {
1325 1 : return s.size
1326 1 : }
1327 :
1328 : // NewReaddHandle is part of the objstorage.Readable interface.
1329 1 : func (s *simpleReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
1330 1 : return &s.rh
1331 1 : }
1332 :
1333 0 : func errCorruptIndexEntry(err error) error {
1334 0 : err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
1335 0 : if invariants.Enabled {
1336 0 : panic(err)
1337 : }
1338 0 : return err
1339 : }
|