Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "io"
13 : "os"
14 : "slices"
15 : "time"
16 :
17 : "github.com/cespare/xxhash/v2"
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/bytealloc"
21 : "github.com/cockroachdb/pebble/internal/cache"
22 : "github.com/cockroachdb/pebble/internal/crc"
23 : "github.com/cockroachdb/pebble/internal/invariants"
24 : "github.com/cockroachdb/pebble/internal/keyspan"
25 : "github.com/cockroachdb/pebble/internal/private"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
28 : )
29 :
30 : var errCorruptIndexEntry = base.CorruptionErrorf("pebble/table: corrupt index entry")
31 : var errReaderClosed = errors.New("pebble/table: reader is closed")
32 :
33 : // decodeBlockHandle returns the block handle encoded at the start of src, as
34 : // well as the number of bytes it occupies. It returns zero if given invalid
35 : // input. A block handle for a data block or a first/lower level index block
36 : // should not be decoded using decodeBlockHandle since the caller may validate
37 : // that the number of bytes decoded is equal to the length of src, which will
38 : // be false if the properties are not decoded. In those cases the caller
39 : // should use decodeBlockHandleWithProperties.
40 1 : func decodeBlockHandle(src []byte) (BlockHandle, int) {
41 1 : offset, n := binary.Uvarint(src)
42 1 : length, m := binary.Uvarint(src[n:])
43 1 : if n == 0 || m == 0 {
44 0 : return BlockHandle{}, 0
45 0 : }
46 1 : return BlockHandle{offset, length}, n + m
47 : }
48 :
49 : // decodeBlockHandleWithProperties returns the block handle and properties
50 : // encoded in src. src needs to be exactly the length that was encoded. This
51 : // method must be used for data block and first/lower level index blocks. The
52 : // properties in the block handle point to the bytes in src.
53 1 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
54 1 : bh, n := decodeBlockHandle(src)
55 1 : if n == 0 {
56 0 : return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
57 0 : }
58 1 : return BlockHandleWithProperties{
59 1 : BlockHandle: bh,
60 1 : Props: src[n:],
61 1 : }, nil
62 : }
63 :
64 1 : func encodeBlockHandle(dst []byte, b BlockHandle) int {
65 1 : n := binary.PutUvarint(dst, b.Offset)
66 1 : m := binary.PutUvarint(dst[n:], b.Length)
67 1 : return n + m
68 1 : }
69 :
70 1 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
71 1 : n := encodeBlockHandle(dst, b.BlockHandle)
72 1 : dst = append(dst[:n], b.Props...)
73 1 : return dst
74 1 : }
75 :
76 : // block is a []byte that holds a sequence of key/value pairs plus an index
77 : // over those pairs.
78 : type block []byte
79 :
80 : type loadBlockResult int8
81 :
82 : const (
83 : loadBlockOK loadBlockResult = iota
84 : // Could be due to error or because no block left to load.
85 : loadBlockFailed
86 : loadBlockIrrelevant
87 : )
88 :
89 : type blockTransform func([]byte) ([]byte, error)
90 :
91 : // ReaderOption provide an interface to do work on Reader while it is being
92 : // opened.
93 : type ReaderOption interface {
94 : // readerApply is called on the reader during opening in order to set internal
95 : // parameters.
96 : readerApply(*Reader)
97 : }
98 :
99 : // Comparers is a map from comparer name to comparer. It is used for debugging
100 : // tools which may be used on multiple databases configured with different
101 : // comparers. Comparers implements the OpenOption interface and can be passed
102 : // as a parameter to NewReader.
103 : type Comparers map[string]*Comparer
104 :
105 0 : func (c Comparers) readerApply(r *Reader) {
106 0 : if r.Compare != nil || r.Properties.ComparerName == "" {
107 0 : return
108 0 : }
109 0 : if comparer, ok := c[r.Properties.ComparerName]; ok {
110 0 : r.Compare = comparer.Compare
111 0 : r.FormatKey = comparer.FormatKey
112 0 : r.Split = comparer.Split
113 0 : }
114 : }
115 :
116 : // Mergers is a map from merger name to merger. It is used for debugging tools
117 : // which may be used on multiple databases configured with different
118 : // mergers. Mergers implements the OpenOption interface and can be passed as
119 : // a parameter to NewReader.
120 : type Mergers map[string]*Merger
121 :
122 0 : func (m Mergers) readerApply(r *Reader) {
123 0 : if r.mergerOK || r.Properties.MergerName == "" {
124 0 : return
125 0 : }
126 0 : _, r.mergerOK = m[r.Properties.MergerName]
127 : }
128 :
129 : // cacheOpts is a Reader open option for specifying the cache ID and sstable file
130 : // number. If not specified, a unique cache ID will be used.
131 : type cacheOpts struct {
132 : cacheID uint64
133 : fileNum base.DiskFileNum
134 : }
135 :
136 : // Marker function to indicate the option should be applied before reading the
137 : // sstable properties and, in the write path, before writing the default
138 : // sstable properties.
139 0 : func (c *cacheOpts) preApply() {}
140 :
141 1 : func (c *cacheOpts) readerApply(r *Reader) {
142 1 : if r.cacheID == 0 {
143 1 : r.cacheID = c.cacheID
144 1 : }
145 1 : if r.fileNum.FileNum() == 0 {
146 1 : r.fileNum = c.fileNum
147 1 : }
148 : }
149 :
150 1 : func (c *cacheOpts) writerApply(w *Writer) {
151 1 : if w.cacheID == 0 {
152 1 : w.cacheID = c.cacheID
153 1 : }
154 1 : if w.fileNum.FileNum() == 0 {
155 1 : w.fileNum = c.fileNum
156 1 : }
157 : }
158 :
159 : // rawTombstonesOpt is a Reader open option for specifying that range
160 : // tombstones returned by Reader.NewRangeDelIter() should not be
161 : // fragmented. Used by debug tools to get a raw view of the tombstones
162 : // contained in an sstable.
163 : type rawTombstonesOpt struct{}
164 :
165 0 : func (rawTombstonesOpt) preApply() {}
166 :
167 0 : func (rawTombstonesOpt) readerApply(r *Reader) {
168 0 : r.rawTombstones = true
169 0 : }
170 :
171 1 : func init() {
172 1 : private.SSTableCacheOpts = func(cacheID uint64, fileNum base.DiskFileNum) interface{} {
173 1 : return &cacheOpts{cacheID, fileNum}
174 1 : }
175 1 : private.SSTableRawTombstonesOpt = rawTombstonesOpt{}
176 : }
177 :
178 : // CommonReader abstracts functionality over a Reader or a VirtualReader. This
179 : // can be used by code which doesn't care to distinguish between a reader and a
180 : // virtual reader.
181 : type CommonReader interface {
182 : NewRawRangeKeyIter() (keyspan.FragmentIterator, error)
183 : NewRawRangeDelIter() (keyspan.FragmentIterator, error)
184 : NewIterWithBlockPropertyFiltersAndContextEtc(
185 : ctx context.Context, lower, upper []byte,
186 : filterer *BlockPropertiesFilterer,
187 : hideObsoletePoints, useFilterBlock bool,
188 : stats *base.InternalIteratorStats,
189 : categoryAndQoS CategoryAndQoS,
190 : statsCollector *CategoryStatsCollector,
191 : rp ReaderProvider,
192 : ) (Iterator, error)
193 : NewCompactionIter(
194 : bytesIterated *uint64,
195 : categoryAndQoS CategoryAndQoS,
196 : statsCollector *CategoryStatsCollector,
197 : rp ReaderProvider,
198 : bufferPool *BufferPool,
199 : ) (Iterator, error)
200 : EstimateDiskUsage(start, end []byte) (uint64, error)
201 : CommonProperties() *CommonProperties
202 : }
203 :
204 : // Reader is a table reader.
205 : type Reader struct {
206 : readable objstorage.Readable
207 : cacheID uint64
208 : fileNum base.DiskFileNum
209 : err error
210 : indexBH BlockHandle
211 : filterBH BlockHandle
212 : rangeDelBH BlockHandle
213 : rangeKeyBH BlockHandle
214 : rangeDelTransform blockTransform
215 : valueBIH valueBlocksIndexHandle
216 : propertiesBH BlockHandle
217 : metaIndexBH BlockHandle
218 : footerBH BlockHandle
219 : opts ReaderOptions
220 : Compare Compare
221 : FormatKey base.FormatKey
222 : Split Split
223 : tableFilter *tableFilterReader
224 : // Keep types that are not multiples of 8 bytes at the end and with
225 : // decreasing size.
226 : Properties Properties
227 : tableFormat TableFormat
228 : rawTombstones bool
229 : mergerOK bool
230 : checksumType ChecksumType
231 : // metaBufferPool is a buffer pool used exclusively when opening a table and
232 : // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
233 : // the BufferPool.pool slice as a part of the Reader allocation. It's
234 : // capacity 3 to accommodate the meta block (1), and both the compressed
235 : // properties block (1) and decompressed properties block (1)
236 : // simultaneously.
237 : metaBufferPool BufferPool
238 : metaBufferPoolAlloc [3]allocedBuffer
239 : }
240 :
241 : // Close implements DB.Close, as documented in the pebble package.
242 1 : func (r *Reader) Close() error {
243 1 : r.opts.Cache.Unref()
244 1 :
245 1 : if r.readable != nil {
246 1 : r.err = firstError(r.err, r.readable.Close())
247 1 : r.readable = nil
248 1 : }
249 :
250 1 : if r.err != nil {
251 0 : return r.err
252 0 : }
253 : // Make any future calls to Get, NewIter or Close return an error.
254 1 : r.err = errReaderClosed
255 1 : return nil
256 : }
257 :
258 : // NewIterWithBlockPropertyFilters returns an iterator for the contents of the
259 : // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after
260 : // itself and returns a nil iterator.
261 : func (r *Reader) NewIterWithBlockPropertyFilters(
262 : lower, upper []byte,
263 : filterer *BlockPropertiesFilterer,
264 : useFilterBlock bool,
265 : stats *base.InternalIteratorStats,
266 : categoryAndQoS CategoryAndQoS,
267 : statsCollector *CategoryStatsCollector,
268 : rp ReaderProvider,
269 1 : ) (Iterator, error) {
270 1 : return r.newIterWithBlockPropertyFiltersAndContext(
271 1 : context.Background(), lower, upper, filterer, false, useFilterBlock, stats,
272 1 : categoryAndQoS, statsCollector, rp, nil)
273 1 : }
274 :
275 : // NewIterWithBlockPropertyFiltersAndContextEtc is similar to
276 : // NewIterWithBlockPropertyFilters and additionally accepts a context for
277 : // tracing.
278 : //
279 : // If hideObsoletePoints, the callee assumes that filterer already includes
280 : // obsoleteKeyBlockPropertyFilter. The caller can satisfy this contract by
281 : // first calling TryAddBlockPropertyFilterForHideObsoletePoints.
282 : func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc(
283 : ctx context.Context,
284 : lower, upper []byte,
285 : filterer *BlockPropertiesFilterer,
286 : hideObsoletePoints, useFilterBlock bool,
287 : stats *base.InternalIteratorStats,
288 : categoryAndQoS CategoryAndQoS,
289 : statsCollector *CategoryStatsCollector,
290 : rp ReaderProvider,
291 1 : ) (Iterator, error) {
292 1 : return r.newIterWithBlockPropertyFiltersAndContext(
293 1 : ctx, lower, upper, filterer, hideObsoletePoints, useFilterBlock, stats, categoryAndQoS,
294 1 : statsCollector, rp, nil)
295 1 : }
296 :
297 : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
298 : // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the
299 : // value of hideObsoletePoints and potentially add a block property filter.
300 : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
301 : snapshotForHideObsoletePoints uint64,
302 : fileLargestSeqNum uint64,
303 : pointKeyFilters []BlockPropertyFilter,
304 1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
305 1 : hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
306 1 : snapshotForHideObsoletePoints > fileLargestSeqNum
307 1 : if hideObsoletePoints {
308 1 : pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
309 1 : }
310 1 : return hideObsoletePoints, pointKeyFilters
311 : }
312 :
313 : func (r *Reader) newIterWithBlockPropertyFiltersAndContext(
314 : ctx context.Context,
315 : lower, upper []byte,
316 : filterer *BlockPropertiesFilterer,
317 : hideObsoletePoints bool,
318 : useFilterBlock bool,
319 : stats *base.InternalIteratorStats,
320 : categoryAndQoS CategoryAndQoS,
321 : statsCollector *CategoryStatsCollector,
322 : rp ReaderProvider,
323 : v *virtualState,
324 1 : ) (Iterator, error) {
325 1 : // NB: pebble.tableCache wraps the returned iterator with one which performs
326 1 : // reference counting on the Reader, preventing the Reader from being closed
327 1 : // until the final iterator closes.
328 1 : if r.Properties.IndexType == twoLevelIndex {
329 1 : i := twoLevelIterPool.Get().(*twoLevelIterator)
330 1 : err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats,
331 1 : categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
332 1 : if err != nil {
333 0 : return nil, err
334 0 : }
335 1 : return i, nil
336 : }
337 :
338 1 : i := singleLevelIterPool.Get().(*singleLevelIterator)
339 1 : err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats,
340 1 : categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
341 1 : if err != nil {
342 0 : return nil, err
343 0 : }
344 1 : return i, nil
345 : }
346 :
347 : // NewIter returns an iterator for the contents of the table. If an error
348 : // occurs, NewIter cleans up after itself and returns a nil iterator. NewIter
349 : // must only be used when the Reader is guaranteed to outlive any LazyValues
350 : // returned from the iter.
351 1 : func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) {
352 1 : return r.NewIterWithBlockPropertyFilters(
353 1 : lower, upper, nil, true /* useFilterBlock */, nil, /* stats */
354 1 : CategoryAndQoS{}, nil /*statsCollector */, TrivialReaderProvider{Reader: r})
355 1 : }
356 :
357 : // NewCompactionIter returns an iterator similar to NewIter but it also increments
358 : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
359 : // after itself and returns a nil iterator.
360 : func (r *Reader) NewCompactionIter(
361 : bytesIterated *uint64,
362 : categoryAndQoS CategoryAndQoS,
363 : statsCollector *CategoryStatsCollector,
364 : rp ReaderProvider,
365 : bufferPool *BufferPool,
366 1 : ) (Iterator, error) {
367 1 : return r.newCompactionIter(bytesIterated, categoryAndQoS, statsCollector, rp, nil, bufferPool)
368 1 : }
369 :
370 : func (r *Reader) newCompactionIter(
371 : bytesIterated *uint64,
372 : categoryAndQoS CategoryAndQoS,
373 : statsCollector *CategoryStatsCollector,
374 : rp ReaderProvider,
375 : v *virtualState,
376 : bufferPool *BufferPool,
377 1 : ) (Iterator, error) {
378 1 : if r.Properties.IndexType == twoLevelIndex {
379 1 : i := twoLevelIterPool.Get().(*twoLevelIterator)
380 1 : err := i.init(
381 1 : context.Background(),
382 1 : r, v, nil /* lower */, nil /* upper */, nil,
383 1 : false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */
384 1 : nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
385 1 : )
386 1 : if err != nil {
387 0 : return nil, err
388 0 : }
389 1 : i.setupForCompaction()
390 1 : return &twoLevelCompactionIterator{
391 1 : twoLevelIterator: i,
392 1 : bytesIterated: bytesIterated,
393 1 : }, nil
394 : }
395 1 : i := singleLevelIterPool.Get().(*singleLevelIterator)
396 1 : err := i.init(
397 1 : context.Background(), r, v, nil /* lower */, nil, /* upper */
398 1 : nil, false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */
399 1 : nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
400 1 : )
401 1 : if err != nil {
402 0 : return nil, err
403 0 : }
404 1 : i.setupForCompaction()
405 1 : return &compactionIterator{
406 1 : singleLevelIterator: i,
407 1 : bytesIterated: bytesIterated,
408 1 : }, nil
409 : }
410 :
411 : // NewRawRangeDelIter returns an internal iterator for the contents of the
412 : // range-del block for the table. Returns nil if the table does not contain
413 : // any range deletions.
414 : //
415 : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
416 : // iterator. Add WithContext methods since the existing ones are public.
417 1 : func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {
418 1 : if r.rangeDelBH.Length == 0 {
419 1 : return nil, nil
420 1 : }
421 1 : h, err := r.readRangeDel(nil /* stats */, nil /* iterStats */)
422 1 : if err != nil {
423 0 : return nil, err
424 0 : }
425 1 : i := &fragmentBlockIter{elideSameSeqnum: true}
426 1 : if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
427 0 : return nil, err
428 0 : }
429 1 : return i, nil
430 : }
431 :
432 : // NewRawRangeKeyIter returns an internal iterator for the contents of the
433 : // range-key block for the table. Returns nil if the table does not contain any
434 : // range keys.
435 : //
436 : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
437 : // iterator. Add WithContext methods since the existing ones are public.
438 1 : func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
439 1 : if r.rangeKeyBH.Length == 0 {
440 1 : return nil, nil
441 1 : }
442 1 : h, err := r.readRangeKey(nil /* stats */, nil /* iterStats */)
443 1 : if err != nil {
444 0 : return nil, err
445 0 : }
446 1 : i := rangeKeyFragmentBlockIterPool.Get().(*rangeKeyFragmentBlockIter)
447 1 : if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
448 0 : return nil, err
449 0 : }
450 1 : return i, nil
451 : }
452 :
453 : type rangeKeyFragmentBlockIter struct {
454 : fragmentBlockIter
455 : }
456 :
457 1 : func (i *rangeKeyFragmentBlockIter) Close() error {
458 1 : err := i.fragmentBlockIter.Close()
459 1 : i.fragmentBlockIter = i.fragmentBlockIter.resetForReuse()
460 1 : rangeKeyFragmentBlockIterPool.Put(i)
461 1 : return err
462 1 : }
463 :
464 : func (r *Reader) readIndex(
465 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
466 1 : ) (bufferHandle, error) {
467 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
468 1 : return r.readBlock(ctx, r.indexBH, nil, nil, stats, iterStats, nil /* buffer pool */)
469 1 : }
470 :
471 : func (r *Reader) readFilter(
472 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
473 1 : ) (bufferHandle, error) {
474 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
475 1 : return r.readBlock(ctx, r.filterBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
476 1 : }
477 :
478 : func (r *Reader) readRangeDel(
479 : stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
480 1 : ) (bufferHandle, error) {
481 1 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
482 1 : return r.readBlock(ctx, r.rangeDelBH, r.rangeDelTransform, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
483 1 : }
484 :
485 : func (r *Reader) readRangeKey(
486 : stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
487 1 : ) (bufferHandle, error) {
488 1 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
489 1 : return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
490 1 : }
491 :
492 : func checkChecksum(
493 : checksumType ChecksumType, b []byte, bh BlockHandle, fileNum base.FileNum,
494 1 : ) error {
495 1 : expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
496 1 : var computedChecksum uint32
497 1 : switch checksumType {
498 1 : case ChecksumTypeCRC32c:
499 1 : computedChecksum = crc.New(b[:bh.Length+1]).Value()
500 0 : case ChecksumTypeXXHash64:
501 0 : computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
502 0 : default:
503 0 : return errors.Errorf("unsupported checksum type: %d", checksumType)
504 : }
505 :
506 1 : if expectedChecksum != computedChecksum {
507 0 : return base.CorruptionErrorf(
508 0 : "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
509 0 : errors.Safe(fileNum), errors.Safe(bh.Offset), errors.Safe(bh.Length))
510 0 : }
511 1 : return nil
512 : }
513 :
514 : type cacheValueOrBuf struct {
515 : // buf.Valid() returns true if backed by a BufferPool.
516 : buf Buf
517 : // v is non-nil if backed by the block cache.
518 : v *cache.Value
519 : }
520 :
521 1 : func (b cacheValueOrBuf) get() []byte {
522 1 : if b.buf.Valid() {
523 1 : return b.buf.p.pool[b.buf.i].b
524 1 : }
525 1 : return b.v.Buf()
526 : }
527 :
528 1 : func (b cacheValueOrBuf) release() {
529 1 : if b.buf.Valid() {
530 1 : b.buf.Release()
531 1 : } else {
532 1 : cache.Free(b.v)
533 1 : }
534 : }
535 :
536 1 : func (b cacheValueOrBuf) truncate(n int) {
537 1 : if b.buf.Valid() {
538 1 : b.buf.p.pool[b.buf.i].b = b.buf.p.pool[b.buf.i].b[:n]
539 1 : } else {
540 1 : b.v.Truncate(n)
541 1 : }
542 : }
543 :
544 : func (r *Reader) readBlock(
545 : ctx context.Context,
546 : bh BlockHandle,
547 : transform blockTransform,
548 : readHandle objstorage.ReadHandle,
549 : stats *base.InternalIteratorStats,
550 : iterStats *iterStatsAccumulator,
551 : bufferPool *BufferPool,
552 1 : ) (handle bufferHandle, _ error) {
553 1 : if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
554 1 : // Cache hit.
555 1 : if readHandle != nil {
556 1 : readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+blockTrailerLen))
557 1 : }
558 1 : if stats != nil {
559 1 : stats.BlockBytes += bh.Length
560 1 : stats.BlockBytesInCache += bh.Length
561 1 : }
562 1 : if iterStats != nil {
563 1 : iterStats.reportStats(bh.Length, bh.Length)
564 1 : }
565 : // This block is already in the cache; return a handle to existing vlaue
566 : // in the cache.
567 1 : return bufferHandle{h: h}, nil
568 : }
569 :
570 : // Cache miss.
571 1 : var compressed cacheValueOrBuf
572 1 : if bufferPool != nil {
573 1 : compressed = cacheValueOrBuf{
574 1 : buf: bufferPool.Alloc(int(bh.Length + blockTrailerLen)),
575 1 : }
576 1 : } else {
577 1 : compressed = cacheValueOrBuf{
578 1 : v: cache.Alloc(int(bh.Length + blockTrailerLen)),
579 1 : }
580 1 : }
581 :
582 1 : readStartTime := time.Now()
583 1 : var err error
584 1 : if readHandle != nil {
585 1 : err = readHandle.ReadAt(ctx, compressed.get(), int64(bh.Offset))
586 1 : } else {
587 1 : err = r.readable.ReadAt(ctx, compressed.get(), int64(bh.Offset))
588 1 : }
589 1 : readDuration := time.Since(readStartTime)
590 1 : // TODO(sumeer): should the threshold be configurable.
591 1 : const slowReadTracingThreshold = 5 * time.Millisecond
592 1 : // The invariants.Enabled path is for deterministic testing.
593 1 : if invariants.Enabled {
594 1 : readDuration = slowReadTracingThreshold
595 1 : }
596 : // Call IsTracingEnabled to avoid the allocations of boxing integers into an
597 : // interface{}, unless necessary.
598 1 : if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) {
599 0 : r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s",
600 0 : int(bh.Length+blockTrailerLen), readDuration.String())
601 0 : }
602 1 : if stats != nil {
603 1 : stats.BlockReadDuration += readDuration
604 1 : }
605 1 : if err != nil {
606 0 : compressed.release()
607 0 : return bufferHandle{}, err
608 0 : }
609 1 : if err := checkChecksum(r.checksumType, compressed.get(), bh, r.fileNum.FileNum()); err != nil {
610 0 : compressed.release()
611 0 : return bufferHandle{}, err
612 0 : }
613 :
614 1 : typ := blockType(compressed.get()[bh.Length])
615 1 : compressed.truncate(int(bh.Length))
616 1 :
617 1 : var decompressed cacheValueOrBuf
618 1 : if typ == noCompressionBlockType {
619 1 : decompressed = compressed
620 1 : } else {
621 1 : // Decode the length of the decompressed value.
622 1 : decodedLen, prefixLen, err := decompressedLen(typ, compressed.get())
623 1 : if err != nil {
624 0 : compressed.release()
625 0 : return bufferHandle{}, err
626 0 : }
627 :
628 1 : if bufferPool != nil {
629 1 : decompressed = cacheValueOrBuf{buf: bufferPool.Alloc(decodedLen)}
630 1 : } else {
631 1 : decompressed = cacheValueOrBuf{v: cache.Alloc(decodedLen)}
632 1 : }
633 1 : if _, err := decompressInto(typ, compressed.get()[prefixLen:], decompressed.get()); err != nil {
634 0 : compressed.release()
635 0 : return bufferHandle{}, err
636 0 : }
637 1 : compressed.release()
638 : }
639 :
640 1 : if transform != nil {
641 0 : // Transforming blocks is very rare, so the extra copy of the
642 0 : // transformed data is not problematic.
643 0 : tmpTransformed, err := transform(decompressed.get())
644 0 : if err != nil {
645 0 : decompressed.release()
646 0 : return bufferHandle{}, err
647 0 : }
648 :
649 0 : var transformed cacheValueOrBuf
650 0 : if bufferPool != nil {
651 0 : transformed = cacheValueOrBuf{buf: bufferPool.Alloc(len(tmpTransformed))}
652 0 : } else {
653 0 : transformed = cacheValueOrBuf{v: cache.Alloc(len(tmpTransformed))}
654 0 : }
655 0 : copy(transformed.get(), tmpTransformed)
656 0 : decompressed.release()
657 0 : decompressed = transformed
658 : }
659 :
660 1 : if stats != nil {
661 1 : stats.BlockBytes += bh.Length
662 1 : }
663 1 : if iterStats != nil {
664 1 : iterStats.reportStats(bh.Length, 0)
665 1 : }
666 1 : if decompressed.buf.Valid() {
667 1 : return bufferHandle{b: decompressed.buf}, nil
668 1 : }
669 1 : h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, decompressed.v)
670 1 : return bufferHandle{h: h}, nil
671 : }
672 :
673 0 : func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) {
674 0 : // Convert v1 (RocksDB format) range-del blocks to v2 blocks on the fly. The
675 0 : // v1 format range-del blocks have unfragmented and unsorted range
676 0 : // tombstones. We need properly fragmented and sorted range tombstones in
677 0 : // order to serve from them directly.
678 0 : iter := &blockIter{}
679 0 : if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false); err != nil {
680 0 : return nil, err
681 0 : }
682 0 : var tombstones []keyspan.Span
683 0 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
684 0 : t := keyspan.Span{
685 0 : Start: key.UserKey,
686 0 : End: value.InPlaceValue(),
687 0 : Keys: []keyspan.Key{{Trailer: key.Trailer}},
688 0 : }
689 0 : tombstones = append(tombstones, t)
690 0 : }
691 0 : keyspan.Sort(r.Compare, tombstones)
692 0 :
693 0 : // Fragment the tombstones, outputting them directly to a block writer.
694 0 : rangeDelBlock := blockWriter{
695 0 : restartInterval: 1,
696 0 : }
697 0 : frag := keyspan.Fragmenter{
698 0 : Cmp: r.Compare,
699 0 : Format: r.FormatKey,
700 0 : Emit: func(s keyspan.Span) {
701 0 : for _, k := range s.Keys {
702 0 : startIK := InternalKey{UserKey: s.Start, Trailer: k.Trailer}
703 0 : rangeDelBlock.add(startIK, s.End)
704 0 : }
705 : },
706 : }
707 0 : for i := range tombstones {
708 0 : frag.Add(tombstones[i])
709 0 : }
710 0 : frag.Finish()
711 0 :
712 0 : // Return the contents of the constructed v2 format range-del block.
713 0 : return rangeDelBlock.finish(), nil
714 : }
715 :
716 1 : func (r *Reader) readMetaindex(metaindexBH BlockHandle) error {
717 1 : // We use a BufferPool when reading metaindex blocks in order to avoid
718 1 : // populating the block cache with these blocks. In heavy-write workloads,
719 1 : // especially with high compaction concurrency, new tables may be created
720 1 : // frequently. Populating the block cache with these metaindex blocks adds
721 1 : // additional contention on the block cache mutexes (see #1997).
722 1 : // Additionally, these blocks are exceedingly unlikely to be read again
723 1 : // while they're still in the block cache except in misconfigurations with
724 1 : // excessive sstables counts or a table cache that's far too small.
725 1 : r.metaBufferPool.initPreallocated(r.metaBufferPoolAlloc[:0])
726 1 : // When we're finished, release the buffers we've allocated back to memory
727 1 : // allocator. We don't expect to use metaBufferPool again.
728 1 : defer r.metaBufferPool.Release()
729 1 :
730 1 : b, err := r.readBlock(
731 1 : context.Background(), metaindexBH, nil /* transform */, nil /* readHandle */, nil, /* stats */
732 1 : nil /* iterStats */, &r.metaBufferPool)
733 1 : if err != nil {
734 0 : return err
735 0 : }
736 1 : data := b.Get()
737 1 : defer b.Release()
738 1 :
739 1 : if uint64(len(data)) != metaindexBH.Length {
740 0 : return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
741 0 : errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
742 0 : }
743 :
744 1 : i, err := newRawBlockIter(bytes.Compare, data)
745 1 : if err != nil {
746 0 : return err
747 0 : }
748 :
749 1 : meta := map[string]BlockHandle{}
750 1 : for valid := i.First(); valid; valid = i.Next() {
751 1 : value := i.Value()
752 1 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
753 1 : vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
754 1 : if err != nil {
755 0 : return err
756 0 : }
757 1 : if n == 0 || n != len(value) {
758 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
759 0 : }
760 1 : r.valueBIH = vbih
761 1 : } else {
762 1 : bh, n := decodeBlockHandle(value)
763 1 : if n == 0 || n != len(value) {
764 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
765 0 : }
766 1 : meta[string(i.Key().UserKey)] = bh
767 : }
768 : }
769 1 : if err := i.Close(); err != nil {
770 0 : return err
771 0 : }
772 :
773 1 : if bh, ok := meta[metaPropertiesName]; ok {
774 1 : b, err = r.readBlock(
775 1 : context.Background(), bh, nil /* transform */, nil /* readHandle */, nil, /* stats */
776 1 : nil /* iterStats */, nil /* buffer pool */)
777 1 : if err != nil {
778 0 : return err
779 0 : }
780 1 : r.propertiesBH = bh
781 1 : err := r.Properties.load(b.Get(), bh.Offset, r.opts.DeniedUserProperties)
782 1 : b.Release()
783 1 : if err != nil {
784 0 : return err
785 0 : }
786 : }
787 :
788 1 : if bh, ok := meta[metaRangeDelV2Name]; ok {
789 1 : r.rangeDelBH = bh
790 1 : } else if bh, ok := meta[metaRangeDelName]; ok {
791 0 : r.rangeDelBH = bh
792 0 : if !r.rawTombstones {
793 0 : r.rangeDelTransform = r.transformRangeDelV1
794 0 : }
795 : }
796 :
797 1 : if bh, ok := meta[metaRangeKeyName]; ok {
798 1 : r.rangeKeyBH = bh
799 1 : }
800 :
801 1 : for name, fp := range r.opts.Filters {
802 1 : types := []struct {
803 1 : ftype FilterType
804 1 : prefix string
805 1 : }{
806 1 : {TableFilter, "fullfilter."},
807 1 : }
808 1 : var done bool
809 1 : for _, t := range types {
810 1 : if bh, ok := meta[t.prefix+name]; ok {
811 1 : r.filterBH = bh
812 1 :
813 1 : switch t.ftype {
814 1 : case TableFilter:
815 1 : r.tableFilter = newTableFilterReader(fp)
816 0 : default:
817 0 : return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
818 : }
819 :
820 1 : done = true
821 1 : break
822 : }
823 : }
824 1 : if done {
825 1 : break
826 : }
827 : }
828 1 : return nil
829 : }
830 :
831 : // Layout returns the layout (block organization) for an sstable.
832 1 : func (r *Reader) Layout() (*Layout, error) {
833 1 : if r.err != nil {
834 0 : return nil, r.err
835 0 : }
836 :
837 1 : l := &Layout{
838 1 : Data: make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
839 1 : Filter: r.filterBH,
840 1 : RangeDel: r.rangeDelBH,
841 1 : RangeKey: r.rangeKeyBH,
842 1 : ValueIndex: r.valueBIH.h,
843 1 : Properties: r.propertiesBH,
844 1 : MetaIndex: r.metaIndexBH,
845 1 : Footer: r.footerBH,
846 1 : Format: r.tableFormat,
847 1 : }
848 1 :
849 1 : indexH, err := r.readIndex(context.Background(), nil, nil)
850 1 : if err != nil {
851 0 : return nil, err
852 0 : }
853 1 : defer indexH.Release()
854 1 :
855 1 : var alloc bytealloc.A
856 1 :
857 1 : if r.Properties.IndexPartitions == 0 {
858 1 : l.Index = append(l.Index, r.indexBH)
859 1 : iter, _ := newBlockIter(r.Compare, indexH.Get())
860 1 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
861 1 : dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
862 1 : if err != nil {
863 0 : return nil, errCorruptIndexEntry
864 0 : }
865 1 : if len(dataBH.Props) > 0 {
866 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
867 1 : }
868 1 : l.Data = append(l.Data, dataBH)
869 : }
870 1 : } else {
871 1 : l.TopIndex = r.indexBH
872 1 : topIter, _ := newBlockIter(r.Compare, indexH.Get())
873 1 : iter := &blockIter{}
874 1 : for key, value := topIter.First(); key != nil; key, value = topIter.Next() {
875 1 : indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
876 1 : if err != nil {
877 0 : return nil, errCorruptIndexEntry
878 0 : }
879 1 : l.Index = append(l.Index, indexBH.BlockHandle)
880 1 :
881 1 : subIndex, err := r.readBlock(context.Background(), indexBH.BlockHandle,
882 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
883 1 : if err != nil {
884 0 : return nil, err
885 0 : }
886 1 : if err := iter.init(r.Compare, subIndex.Get(), 0, /* globalSeqNum */
887 1 : false /* hideObsoletePoints */); err != nil {
888 0 : return nil, err
889 0 : }
890 1 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
891 1 : dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
892 1 : if len(dataBH.Props) > 0 {
893 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
894 1 : }
895 1 : if err != nil {
896 0 : return nil, errCorruptIndexEntry
897 0 : }
898 1 : l.Data = append(l.Data, dataBH)
899 : }
900 1 : subIndex.Release()
901 1 : *iter = iter.resetForReuse()
902 : }
903 : }
904 1 : if r.valueBIH.h.Length != 0 {
905 1 : vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
906 1 : if err != nil {
907 0 : return nil, err
908 0 : }
909 1 : defer vbiH.Release()
910 1 : vbiBlock := vbiH.Get()
911 1 : indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
912 1 : r.valueBIH.blockLengthByteLength)
913 1 : i := 0
914 1 : for len(vbiBlock) != 0 {
915 1 : if len(vbiBlock) < indexEntryLen {
916 0 : return nil, errors.Errorf(
917 0 : "remaining value index block %d does not contain a full entry of length %d",
918 0 : len(vbiBlock), indexEntryLen)
919 0 : }
920 1 : n := int(r.valueBIH.blockNumByteLength)
921 1 : bn := int(littleEndianGet(vbiBlock, n))
922 1 : if bn != i {
923 0 : return nil, errors.Errorf("unexpected block num %d, expected %d",
924 0 : bn, i)
925 0 : }
926 1 : i++
927 1 : vbiBlock = vbiBlock[n:]
928 1 : n = int(r.valueBIH.blockOffsetByteLength)
929 1 : blockOffset := littleEndianGet(vbiBlock, n)
930 1 : vbiBlock = vbiBlock[n:]
931 1 : n = int(r.valueBIH.blockLengthByteLength)
932 1 : blockLen := littleEndianGet(vbiBlock, n)
933 1 : vbiBlock = vbiBlock[n:]
934 1 : l.ValueBlock = append(l.ValueBlock, BlockHandle{Offset: blockOffset, Length: blockLen})
935 : }
936 : }
937 :
938 1 : return l, nil
939 : }
940 :
941 : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
942 1 : func (r *Reader) ValidateBlockChecksums() error {
943 1 : // Pre-compute the BlockHandles for the underlying file.
944 1 : l, err := r.Layout()
945 1 : if err != nil {
946 0 : return err
947 0 : }
948 :
949 : // Construct the set of blocks to check. Note that the footer is not checked
950 : // as it is not a block with a checksum.
951 1 : blocks := make([]BlockHandle, len(l.Data))
952 1 : for i := range l.Data {
953 1 : blocks[i] = l.Data[i].BlockHandle
954 1 : }
955 1 : blocks = append(blocks, l.Index...)
956 1 : blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
957 1 :
958 1 : // Sorting by offset ensures we are performing a sequential scan of the
959 1 : // file.
960 1 : slices.SortFunc(blocks, func(a, b BlockHandle) int {
961 1 : return cmp.Compare(a.Offset, b.Offset)
962 1 : })
963 :
964 : // Check all blocks sequentially. Make use of read-ahead, given we are
965 : // scanning the entire file from start to end.
966 1 : rh := r.readable.NewReadHandle(context.TODO())
967 1 : defer rh.Close()
968 1 :
969 1 : for _, bh := range blocks {
970 1 : // Certain blocks may not be present, in which case we skip them.
971 1 : if bh.Length == 0 {
972 1 : continue
973 : }
974 :
975 : // Read the block, which validates the checksum.
976 1 : h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
977 1 : if err != nil {
978 0 : return err
979 0 : }
980 1 : h.Release()
981 : }
982 :
983 1 : return nil
984 : }
985 :
986 : // CommonProperties implemented the CommonReader interface.
987 1 : func (r *Reader) CommonProperties() *CommonProperties {
988 1 : return &r.Properties.CommonProperties
989 1 : }
990 :
991 : // EstimateDiskUsage returns the total size of data blocks overlapping the range
992 : // `[start, end]`. Even if a data block partially overlaps, or we cannot
993 : // determine overlap due to abbreviated index keys, the full data block size is
994 : // included in the estimation.
995 : //
996 : // This function does not account for any metablock space usage. Assumes there
997 : // is at least partial overlap, i.e., `[start, end]` falls neither completely
998 : // before nor completely after the file's range.
999 : //
1000 : // Only blocks containing point keys are considered. Range deletion and range
1001 : // key blocks are not considered.
1002 : //
1003 : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
1004 : // data blocks overlapped and add that same fraction of the metadata blocks to the
1005 : // estimate.
1006 1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
1007 1 : if r.err != nil {
1008 0 : return 0, r.err
1009 0 : }
1010 :
1011 1 : indexH, err := r.readIndex(context.Background(), nil, nil)
1012 1 : if err != nil {
1013 0 : return 0, err
1014 0 : }
1015 1 : defer indexH.Release()
1016 1 :
1017 1 : // Iterators over the bottom-level index blocks containing start and end.
1018 1 : // These may be different in case of partitioned index but will both point
1019 1 : // to the same blockIter over the single index in the unpartitioned case.
1020 1 : var startIdxIter, endIdxIter *blockIter
1021 1 : if r.Properties.IndexPartitions == 0 {
1022 1 : iter, err := newBlockIter(r.Compare, indexH.Get())
1023 1 : if err != nil {
1024 0 : return 0, err
1025 0 : }
1026 1 : startIdxIter = iter
1027 1 : endIdxIter = iter
1028 1 : } else {
1029 1 : topIter, err := newBlockIter(r.Compare, indexH.Get())
1030 1 : if err != nil {
1031 0 : return 0, err
1032 0 : }
1033 :
1034 1 : key, val := topIter.SeekGE(start, base.SeekGEFlagsNone)
1035 1 : if key == nil {
1036 1 : // The range falls completely after this file, or an error occurred.
1037 1 : return 0, topIter.Error()
1038 1 : }
1039 1 : startIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1040 1 : if err != nil {
1041 0 : return 0, errCorruptIndexEntry
1042 0 : }
1043 1 : startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.BlockHandle,
1044 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
1045 1 : if err != nil {
1046 0 : return 0, err
1047 0 : }
1048 1 : defer startIdxBlock.Release()
1049 1 : startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get())
1050 1 : if err != nil {
1051 0 : return 0, err
1052 0 : }
1053 :
1054 1 : key, val = topIter.SeekGE(end, base.SeekGEFlagsNone)
1055 1 : if key == nil {
1056 1 : if err := topIter.Error(); err != nil {
1057 0 : return 0, err
1058 0 : }
1059 1 : } else {
1060 1 : endIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1061 1 : if err != nil {
1062 0 : return 0, errCorruptIndexEntry
1063 0 : }
1064 1 : endIdxBlock, err := r.readBlock(context.Background(),
1065 1 : endIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
1066 1 : if err != nil {
1067 0 : return 0, err
1068 0 : }
1069 1 : defer endIdxBlock.Release()
1070 1 : endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get())
1071 1 : if err != nil {
1072 0 : return 0, err
1073 0 : }
1074 : }
1075 : }
1076 : // startIdxIter should not be nil at this point, while endIdxIter can be if the
1077 : // range spans past the end of the file.
1078 :
1079 1 : key, val := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
1080 1 : if key == nil {
1081 1 : // The range falls completely after this file, or an error occurred.
1082 1 : return 0, startIdxIter.Error()
1083 1 : }
1084 1 : startBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1085 1 : if err != nil {
1086 0 : return 0, errCorruptIndexEntry
1087 0 : }
1088 :
1089 1 : includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
1090 1 : // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
1091 1 : // Linearly interpolate what is stored in value blocks.
1092 1 : //
1093 1 : // TODO(sumeer): if we need more accuracy, without loading any data blocks
1094 1 : // (which contain the value handles, and which may also be insufficient if
1095 1 : // the values are in separate files), we will need to accumulate the
1096 1 : // logical size of the key-value pairs and store the cumulative value for
1097 1 : // each data block in the index block entry. This increases the size of
1098 1 : // the BlockHandle, so wait until this becomes necessary.
1099 1 : return dataBlockSize +
1100 1 : uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
1101 1 : float64(r.Properties.ValueBlocksSize))
1102 1 : }
1103 1 : if endIdxIter == nil {
1104 1 : // The range spans beyond this file. Include data blocks through the last.
1105 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1106 1 : }
1107 1 : key, val = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
1108 1 : if key == nil {
1109 1 : if err := endIdxIter.Error(); err != nil {
1110 0 : return 0, err
1111 0 : }
1112 : // The range spans beyond this file. Include data blocks through the last.
1113 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1114 : }
1115 1 : endBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1116 1 : if err != nil {
1117 0 : return 0, errCorruptIndexEntry
1118 0 : }
1119 1 : return includeInterpolatedValueBlocksSize(
1120 1 : endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset), nil
1121 : }
1122 :
1123 : // TableFormat returns the format version for the table.
1124 1 : func (r *Reader) TableFormat() (TableFormat, error) {
1125 1 : if r.err != nil {
1126 0 : return TableFormatUnspecified, r.err
1127 0 : }
1128 1 : return r.tableFormat, nil
1129 : }
1130 :
1131 : // NewReader returns a new table reader for the file. Closing the reader will
1132 : // close the file.
1133 1 : func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) {
1134 1 : o = o.ensureDefaults()
1135 1 : r := &Reader{
1136 1 : readable: f,
1137 1 : opts: o,
1138 1 : }
1139 1 : if r.opts.Cache == nil {
1140 0 : r.opts.Cache = cache.New(0)
1141 1 : } else {
1142 1 : r.opts.Cache.Ref()
1143 1 : }
1144 :
1145 1 : if f == nil {
1146 0 : r.err = errors.New("pebble/table: nil file")
1147 0 : return nil, r.Close()
1148 0 : }
1149 :
1150 : // Note that the extra options are applied twice. First here for pre-apply
1151 : // options, and then below for post-apply options. Pre and post refer to
1152 : // before and after reading the metaindex and properties.
1153 1 : type preApply interface{ preApply() }
1154 1 : for _, opt := range extraOpts {
1155 1 : if _, ok := opt.(preApply); ok {
1156 1 : opt.readerApply(r)
1157 1 : }
1158 : }
1159 1 : if r.cacheID == 0 {
1160 0 : r.cacheID = r.opts.Cache.NewID()
1161 0 : }
1162 :
1163 1 : footer, err := readFooter(f)
1164 1 : if err != nil {
1165 0 : r.err = err
1166 0 : return nil, r.Close()
1167 0 : }
1168 1 : r.checksumType = footer.checksum
1169 1 : r.tableFormat = footer.format
1170 1 : // Read the metaindex.
1171 1 : if err := r.readMetaindex(footer.metaindexBH); err != nil {
1172 0 : r.err = err
1173 0 : return nil, r.Close()
1174 0 : }
1175 1 : r.indexBH = footer.indexBH
1176 1 : r.metaIndexBH = footer.metaindexBH
1177 1 : r.footerBH = footer.footerBH
1178 1 :
1179 1 : if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
1180 1 : r.Compare = o.Comparer.Compare
1181 1 : r.FormatKey = o.Comparer.FormatKey
1182 1 : r.Split = o.Comparer.Split
1183 1 : }
1184 :
1185 1 : if o.MergerName == r.Properties.MergerName {
1186 1 : r.mergerOK = true
1187 1 : }
1188 :
1189 : // Apply the extra options again now that the comparer and merger names are
1190 : // known.
1191 1 : for _, opt := range extraOpts {
1192 1 : if _, ok := opt.(preApply); !ok {
1193 1 : opt.readerApply(r)
1194 1 : }
1195 : }
1196 :
1197 1 : if r.Compare == nil {
1198 0 : r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
1199 0 : errors.Safe(r.fileNum), errors.Safe(r.Properties.ComparerName))
1200 0 : }
1201 1 : if !r.mergerOK {
1202 0 : if name := r.Properties.MergerName; name != "" && name != "nullptr" {
1203 0 : r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
1204 0 : errors.Safe(r.fileNum), errors.Safe(r.Properties.MergerName))
1205 0 : }
1206 : }
1207 1 : if r.err != nil {
1208 0 : return nil, r.Close()
1209 0 : }
1210 :
1211 1 : return r, nil
1212 : }
1213 :
1214 : // ReadableFile describes the smallest subset of vfs.File that is required for
1215 : // reading SSTs.
1216 : type ReadableFile interface {
1217 : io.ReaderAt
1218 : io.Closer
1219 : Stat() (os.FileInfo, error)
1220 : }
1221 :
1222 : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
1223 : // implementation (which does not support read-ahead)
1224 1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
1225 1 : info, err := r.Stat()
1226 1 : if err != nil {
1227 0 : return nil, err
1228 0 : }
1229 1 : res := &simpleReadable{
1230 1 : f: r,
1231 1 : size: info.Size(),
1232 1 : }
1233 1 : res.rh = objstorage.MakeNoopReadHandle(res)
1234 1 : return res, nil
1235 : }
1236 :
1237 : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
1238 : type simpleReadable struct {
1239 : f ReadableFile
1240 : size int64
1241 : rh objstorage.NoopReadHandle
1242 : }
1243 :
1244 : var _ objstorage.Readable = (*simpleReadable)(nil)
1245 :
1246 : // ReadAt is part of the objstorage.Readable interface.
1247 1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
1248 1 : n, err := s.f.ReadAt(p, off)
1249 1 : if invariants.Enabled && err == nil && n != len(p) {
1250 0 : panic("short read")
1251 : }
1252 1 : return err
1253 : }
1254 :
1255 : // Close is part of the objstorage.Readable interface.
1256 1 : func (s *simpleReadable) Close() error {
1257 1 : return s.f.Close()
1258 1 : }
1259 :
1260 : // Size is part of the objstorage.Readable interface.
1261 1 : func (s *simpleReadable) Size() int64 {
1262 1 : return s.size
1263 1 : }
1264 :
1265 : // NewReaddHandle is part of the objstorage.Readable interface.
1266 1 : func (s *simpleReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
1267 1 : return &s.rh
1268 1 : }
|