Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "io"
13 : "os"
14 : "slices"
15 : "time"
16 :
17 : "github.com/cespare/xxhash/v2"
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/bytealloc"
21 : "github.com/cockroachdb/pebble/internal/cache"
22 : "github.com/cockroachdb/pebble/internal/crc"
23 : "github.com/cockroachdb/pebble/internal/invariants"
24 : "github.com/cockroachdb/pebble/internal/keyspan"
25 : "github.com/cockroachdb/pebble/internal/private"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
28 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
29 : "github.com/cockroachdb/pebble/sstable/block"
30 : "github.com/cockroachdb/pebble/sstable/rowblk"
31 : )
32 :
33 : var errReaderClosed = errors.New("pebble/table: reader is closed")
34 :
35 : // decodeBlockHandle returns the block handle encoded at the start of src, as
36 : // well as the number of bytes it occupies. It returns zero if given invalid
37 : // input. A block handle for a data block or a first/lower level index block
38 : // should not be decoded using decodeBlockHandle since the caller may validate
39 : // that the number of bytes decoded is equal to the length of src, which will
40 : // be false if the properties are not decoded. In those cases the caller
41 : // should use decodeBlockHandleWithProperties.
42 2 : func decodeBlockHandle(src []byte) (block.Handle, int) {
43 2 : offset, n := binary.Uvarint(src)
44 2 : length, m := binary.Uvarint(src[n:])
45 2 : if n == 0 || m == 0 {
46 0 : return block.Handle{}, 0
47 0 : }
48 2 : return block.Handle{Offset: offset, Length: length}, n + m
49 : }
50 :
51 : // decodeBlockHandleWithProperties returns the block handle and properties
52 : // encoded in src. src needs to be exactly the length that was encoded. This
53 : // method must be used for data block and first/lower level index blocks. The
54 : // properties in the block handle point to the bytes in src.
55 2 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
56 2 : bh, n := decodeBlockHandle(src)
57 2 : if n == 0 {
58 0 : return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
59 0 : }
60 2 : return BlockHandleWithProperties{
61 2 : Handle: bh,
62 2 : Props: src[n:],
63 2 : }, nil
64 : }
65 :
66 2 : func encodeBlockHandle(dst []byte, b block.Handle) int {
67 2 : n := binary.PutUvarint(dst, b.Offset)
68 2 : m := binary.PutUvarint(dst[n:], b.Length)
69 2 : return n + m
70 2 : }
71 :
72 2 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
73 2 : n := encodeBlockHandle(dst, b.Handle)
74 2 : dst = append(dst[:n], b.Props...)
75 2 : return dst
76 2 : }
77 :
78 : type loadBlockResult int8
79 :
80 : const (
81 : loadBlockOK loadBlockResult = iota
82 : // Could be due to error or because no block left to load.
83 : loadBlockFailed
84 : loadBlockIrrelevant
85 : )
86 :
87 : type blockTransform func([]byte) ([]byte, error)
88 :
89 : // ReaderOption provide an interface to do work on Reader while it is being
90 : // opened.
91 : type ReaderOption interface {
92 : // readerApply is called on the reader during opening in order to set internal
93 : // parameters.
94 : readerApply(*Reader)
95 : }
96 :
97 : // Comparers is a map from comparer name to comparer. It is used for debugging
98 : // tools which may be used on multiple databases configured with different
99 : // comparers. Comparers implements the OpenOption interface and can be passed
100 : // as a parameter to NewReader.
101 : type Comparers map[string]*Comparer
102 :
103 1 : func (c Comparers) readerApply(r *Reader) {
104 1 : if r.Compare != nil || r.Properties.ComparerName == "" {
105 1 : return
106 1 : }
107 1 : if comparer, ok := c[r.Properties.ComparerName]; ok {
108 1 : r.Compare = comparer.Compare
109 1 : r.Equal = comparer.Equal
110 1 : r.FormatKey = comparer.FormatKey
111 1 : r.Split = comparer.Split
112 1 : }
113 : }
114 :
115 : // Mergers is a map from merger name to merger. It is used for debugging tools
116 : // which may be used on multiple databases configured with different
117 : // mergers. Mergers implements the OpenOption interface and can be passed as
118 : // a parameter to NewReader.
119 : type Mergers map[string]*Merger
120 :
121 1 : func (m Mergers) readerApply(r *Reader) {
122 1 : if r.mergerOK || r.Properties.MergerName == "" {
123 1 : return
124 1 : }
125 1 : _, r.mergerOK = m[r.Properties.MergerName]
126 : }
127 :
128 : // cacheOpts is a Reader open option for specifying the cache ID and sstable file
129 : // number. If not specified, a unique cache ID will be used.
130 : type cacheOpts struct {
131 : cacheID uint64
132 : fileNum base.DiskFileNum
133 : }
134 :
135 : // Marker function to indicate the option should be applied before reading the
136 : // sstable properties and, in the write path, before writing the default
137 : // sstable properties.
138 0 : func (c *cacheOpts) preApply() {}
139 :
140 2 : func (c *cacheOpts) readerApply(r *Reader) {
141 2 : if r.cacheID == 0 {
142 2 : r.cacheID = c.cacheID
143 2 : }
144 2 : if r.fileNum == 0 {
145 2 : r.fileNum = c.fileNum
146 2 : }
147 : }
148 :
149 2 : func (c *cacheOpts) writerApply(w *Writer) {
150 2 : if w.layout.cacheID == 0 {
151 2 : w.layout.cacheID = c.cacheID
152 2 : }
153 2 : if w.layout.fileNum == 0 {
154 2 : w.layout.fileNum = c.fileNum
155 2 : }
156 : }
157 :
158 : // rawTombstonesOpt is a Reader open option for specifying that range
159 : // tombstones returned by Reader.NewRangeDelIter() should not be
160 : // fragmented. Used by debug tools to get a raw view of the tombstones
161 : // contained in an sstable.
162 : type rawTombstonesOpt struct{}
163 :
164 0 : func (rawTombstonesOpt) preApply() {}
165 :
166 1 : func (rawTombstonesOpt) readerApply(r *Reader) {
167 1 : r.rawTombstones = true
168 1 : }
169 :
170 2 : func init() {
171 2 : private.SSTableCacheOpts = func(cacheID uint64, fileNum base.DiskFileNum) interface{} {
172 2 : return &cacheOpts{cacheID, fileNum}
173 2 : }
174 2 : private.SSTableRawTombstonesOpt = rawTombstonesOpt{}
175 : }
176 :
177 : // Reader is a table reader.
178 : type Reader struct {
179 : readable objstorage.Readable
180 : cacheID uint64
181 : fileNum base.DiskFileNum
182 : err error
183 : indexBH block.Handle
184 : filterBH block.Handle
185 : rangeDelBH block.Handle
186 : rangeKeyBH block.Handle
187 : valueBIH valueBlocksIndexHandle
188 : propertiesBH block.Handle
189 : metaIndexBH block.Handle
190 : footerBH block.Handle
191 : opts ReaderOptions
192 : Compare Compare
193 : Equal Equal
194 : FormatKey base.FormatKey
195 : Split Split
196 : tableFilter *tableFilterReader
197 : // Keep types that are not multiples of 8 bytes at the end and with
198 : // decreasing size.
199 : Properties Properties
200 : tableFormat TableFormat
201 : rawTombstones bool
202 : mergerOK bool
203 : checksumType block.ChecksumType
204 : // metaBufferPool is a buffer pool used exclusively when opening a table and
205 : // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
206 : // the BufferPool.pool slice as a part of the Reader allocation. It's
207 : // capacity 3 to accommodate the meta block (1), and both the compressed
208 : // properties block (1) and decompressed properties block (1)
209 : // simultaneously.
210 : metaBufferPool block.BufferPool
211 : metaBufferPoolAlloc [3]block.AllocedBuffer
212 : }
213 :
214 : var _ CommonReader = (*Reader)(nil)
215 :
216 : // Close the reader and the underlying objstorage.Readable.
217 2 : func (r *Reader) Close() error {
218 2 : r.opts.Cache.Unref()
219 2 :
220 2 : if r.readable != nil {
221 2 : r.err = firstError(r.err, r.readable.Close())
222 2 : r.readable = nil
223 2 : }
224 :
225 2 : if r.err != nil {
226 1 : return r.err
227 1 : }
228 : // Make any future calls to Get, NewIter or Close return an error.
229 2 : r.err = errReaderClosed
230 2 : return nil
231 : }
232 :
233 : // NewIterWithBlockPropertyFilters returns an iterator for the contents of the
234 : // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after
235 : // itself and returns a nil iterator.
236 : func (r *Reader) NewIterWithBlockPropertyFilters(
237 : transforms IterTransforms,
238 : lower, upper []byte,
239 : filterer *BlockPropertiesFilterer,
240 : useFilterBlock bool,
241 : stats *base.InternalIteratorStats,
242 : categoryAndQoS CategoryAndQoS,
243 : statsCollector *CategoryStatsCollector,
244 : rp ReaderProvider,
245 2 : ) (Iterator, error) {
246 2 : return r.newIterWithBlockPropertyFiltersAndContext(
247 2 : context.Background(), transforms, lower, upper, filterer, useFilterBlock,
248 2 : stats, categoryAndQoS, statsCollector, rp, nil)
249 2 : }
250 :
251 : // NewIterWithBlockPropertyFiltersAndContextEtc is similar to
252 : // NewIterWithBlockPropertyFilters and additionally accepts a context for
253 : // tracing.
254 : //
255 : // If transform.HideObsoletePoints is set, the callee assumes that filterer
256 : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
257 : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
258 : func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc(
259 : ctx context.Context,
260 : transforms IterTransforms,
261 : lower, upper []byte,
262 : filterer *BlockPropertiesFilterer,
263 : useFilterBlock bool,
264 : stats *base.InternalIteratorStats,
265 : categoryAndQoS CategoryAndQoS,
266 : statsCollector *CategoryStatsCollector,
267 : rp ReaderProvider,
268 2 : ) (Iterator, error) {
269 2 : return r.newIterWithBlockPropertyFiltersAndContext(
270 2 : ctx, transforms, lower, upper, filterer, useFilterBlock,
271 2 : stats, categoryAndQoS, statsCollector, rp, nil)
272 2 : }
273 :
274 : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
275 : // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the
276 : // value of hideObsoletePoints and potentially add a block property filter.
277 : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
278 : snapshotForHideObsoletePoints base.SeqNum,
279 : fileLargestSeqNum base.SeqNum,
280 : pointKeyFilters []BlockPropertyFilter,
281 2 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
282 2 : hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
283 2 : snapshotForHideObsoletePoints > fileLargestSeqNum
284 2 : if hideObsoletePoints {
285 2 : pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
286 2 : }
287 2 : return hideObsoletePoints, pointKeyFilters
288 : }
289 :
290 : func (r *Reader) newIterWithBlockPropertyFiltersAndContext(
291 : ctx context.Context,
292 : transforms IterTransforms,
293 : lower, upper []byte,
294 : filterer *BlockPropertiesFilterer,
295 : useFilterBlock bool,
296 : stats *base.InternalIteratorStats,
297 : categoryAndQoS CategoryAndQoS,
298 : statsCollector *CategoryStatsCollector,
299 : rp ReaderProvider,
300 : vState *virtualState,
301 2 : ) (Iterator, error) {
302 2 : // NB: pebble.tableCache wraps the returned iterator with one which performs
303 2 : // reference counting on the Reader, preventing the Reader from being closed
304 2 : // until the final iterator closes.
305 2 : if r.Properties.IndexType == twoLevelIndex {
306 2 : i := twoLevelIterPool.Get().(*twoLevelIterator)
307 2 : err := i.init(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
308 2 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
309 2 : if err != nil {
310 1 : return nil, err
311 1 : }
312 2 : return i, nil
313 : }
314 :
315 2 : i := singleLevelIterPool.Get().(*singleLevelIterator)
316 2 : err := i.init(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
317 2 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
318 2 : if err != nil {
319 1 : return nil, err
320 1 : }
321 2 : return i, nil
322 : }
323 :
324 : // NewIter returns an iterator for the contents of the table. If an error
325 : // occurs, NewIter cleans up after itself and returns a nil iterator. NewIter
326 : // must only be used when the Reader is guaranteed to outlive any LazyValues
327 : // returned from the iter.
328 2 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
329 2 : return r.NewIterWithBlockPropertyFilters(
330 2 : transforms, lower, upper, nil, true, /* useFilterBlock */
331 2 : nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, TrivialReaderProvider{Reader: r})
332 2 : }
333 :
334 : // NewCompactionIter returns an iterator similar to NewIter but it also increments
335 : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
336 : // after itself and returns a nil iterator.
337 : func (r *Reader) NewCompactionIter(
338 : transforms IterTransforms,
339 : categoryAndQoS CategoryAndQoS,
340 : statsCollector *CategoryStatsCollector,
341 : rp ReaderProvider,
342 : bufferPool *block.BufferPool,
343 2 : ) (Iterator, error) {
344 2 : return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool)
345 2 : }
346 :
347 : func (r *Reader) newCompactionIter(
348 : transforms IterTransforms,
349 : categoryAndQoS CategoryAndQoS,
350 : statsCollector *CategoryStatsCollector,
351 : rp ReaderProvider,
352 : vState *virtualState,
353 : bufferPool *block.BufferPool,
354 2 : ) (Iterator, error) {
355 2 : if vState != nil && vState.isSharedIngested {
356 2 : transforms.HideObsoletePoints = true
357 2 : }
358 2 : if r.Properties.IndexType == twoLevelIndex {
359 2 : i := twoLevelIterPool.Get().(*twoLevelIterator)
360 2 : err := i.init(
361 2 : context.Background(),
362 2 : r, vState, transforms, nil /* lower */, nil /* upper */, nil,
363 2 : false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
364 2 : )
365 2 : if err != nil {
366 0 : return nil, err
367 0 : }
368 2 : i.setupForCompaction()
369 2 : return &twoLevelCompactionIterator{twoLevelIterator: i}, nil
370 : }
371 2 : i := singleLevelIterPool.Get().(*singleLevelIterator)
372 2 : err := i.init(
373 2 : context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
374 2 : nil, false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
375 2 : )
376 2 : if err != nil {
377 0 : return nil, err
378 0 : }
379 2 : i.setupForCompaction()
380 2 : return &compactionIterator{singleLevelIterator: i}, nil
381 : }
382 :
383 : // NewRawRangeDelIter returns an internal iterator for the contents of the
384 : // range-del block for the table. Returns nil if the table does not contain
385 : // any range deletions.
386 : //
387 : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
388 : // iterator. Add WithContext methods since the existing ones are public.
389 : func (r *Reader) NewRawRangeDelIter(
390 : transforms FragmentIterTransforms,
391 2 : ) (keyspan.FragmentIterator, error) {
392 2 : if r.rangeDelBH.Length == 0 {
393 2 : return nil, nil
394 2 : }
395 2 : h, err := r.readRangeDel(nil /* stats */, nil /* iterStats */)
396 2 : if err != nil {
397 1 : return nil, err
398 1 : }
399 2 : transforms.ElideSameSeqNum = true
400 2 : i, err := rowblk.NewFragmentIter(r.Compare, r.Split, h, transforms)
401 2 : if err != nil {
402 0 : return nil, err
403 0 : }
404 2 : return keyspan.MaybeAssert(i, r.Compare), nil
405 : }
406 :
407 : // NewRawRangeKeyIter returns an internal iterator for the contents of the
408 : // range-key block for the table. Returns nil if the table does not contain any
409 : // range keys.
410 : //
411 : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
412 : // iterator. Add WithContext methods since the existing ones are public.
413 : func (r *Reader) NewRawRangeKeyIter(
414 : transforms FragmentIterTransforms,
415 2 : ) (keyspan.FragmentIterator, error) {
416 2 : if r.rangeKeyBH.Length == 0 {
417 2 : return nil, nil
418 2 : }
419 2 : h, err := r.readRangeKey(nil /* stats */, nil /* iterStats */)
420 2 : if err != nil {
421 1 : return nil, err
422 1 : }
423 2 : i, err := rowblk.NewFragmentIter(r.Compare, r.Split, h, transforms)
424 2 : if err != nil {
425 0 : return nil, err
426 0 : }
427 2 : return keyspan.MaybeAssert(i, r.Compare), nil
428 : }
429 :
430 : func (r *Reader) readIndex(
431 : ctx context.Context,
432 : readHandle objstorage.ReadHandle,
433 : stats *base.InternalIteratorStats,
434 : iterStats *iterStatsAccumulator,
435 2 : ) (block.BufferHandle, error) {
436 2 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
437 2 : return r.readBlock(ctx, r.indexBH, nil, readHandle, stats, iterStats, nil /* buffer pool */)
438 2 : }
439 :
440 : func (r *Reader) readFilter(
441 : ctx context.Context,
442 : readHandle objstorage.ReadHandle,
443 : stats *base.InternalIteratorStats,
444 : iterStats *iterStatsAccumulator,
445 2 : ) (block.BufferHandle, error) {
446 2 : ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
447 2 : return r.readBlock(ctx, r.filterBH, nil /* transform */, readHandle, stats, iterStats, nil /* buffer pool */)
448 2 : }
449 :
450 : func (r *Reader) readRangeDel(
451 : stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
452 2 : ) (block.BufferHandle, error) {
453 2 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
454 2 : return r.readBlock(ctx, r.rangeDelBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
455 2 : }
456 :
457 : func (r *Reader) readRangeKey(
458 : stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
459 2 : ) (block.BufferHandle, error) {
460 2 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
461 2 : return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
462 2 : }
463 :
464 : func checkChecksum(
465 : checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
466 2 : ) error {
467 2 : expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
468 2 : var computedChecksum uint32
469 2 : switch checksumType {
470 2 : case block.ChecksumTypeCRC32c:
471 2 : computedChecksum = crc.New(b[:bh.Length+1]).Value()
472 1 : case block.ChecksumTypeXXHash64:
473 1 : computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
474 0 : default:
475 0 : return errors.Errorf("unsupported checksum type: %d", checksumType)
476 : }
477 :
478 2 : if expectedChecksum != computedChecksum {
479 1 : return base.CorruptionErrorf(
480 1 : "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
481 1 : fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
482 1 : }
483 2 : return nil
484 : }
485 :
486 : // DeterministicReadBlockDurationForTesting is for tests that want a
487 : // deterministic value of the time to read a block (that is not in the cache).
488 : // The return value is a function that must be called before the test exits.
489 1 : func DeterministicReadBlockDurationForTesting() func() {
490 1 : drbdForTesting := deterministicReadBlockDurationForTesting
491 1 : deterministicReadBlockDurationForTesting = true
492 1 : return func() {
493 1 : deterministicReadBlockDurationForTesting = drbdForTesting
494 1 : }
495 : }
496 :
497 : var deterministicReadBlockDurationForTesting = false
498 :
499 : func (r *Reader) readBlock(
500 : ctx context.Context,
501 : bh block.Handle,
502 : transform blockTransform,
503 : readHandle objstorage.ReadHandle,
504 : stats *base.InternalIteratorStats,
505 : iterStats *iterStatsAccumulator,
506 : bufferPool *block.BufferPool,
507 2 : ) (handle block.BufferHandle, _ error) {
508 2 : if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
509 2 : // Cache hit.
510 2 : if readHandle != nil {
511 2 : readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
512 2 : }
513 2 : if stats != nil {
514 2 : stats.BlockBytes += bh.Length
515 2 : stats.BlockBytesInCache += bh.Length
516 2 : }
517 2 : if iterStats != nil {
518 2 : iterStats.reportStats(bh.Length, bh.Length, 0)
519 2 : }
520 : // This block is already in the cache; return a handle to existing vlaue
521 : // in the cache.
522 2 : return block.CacheBufferHandle(h), nil
523 : }
524 :
525 : // Cache miss.
526 :
527 2 : if sema := r.opts.LoadBlockSema; sema != nil {
528 1 : if err := sema.Acquire(ctx, 1); err != nil {
529 0 : // An error here can only come from the context.
530 0 : return block.BufferHandle{}, err
531 0 : }
532 1 : defer sema.Release(1)
533 : }
534 :
535 2 : compressed := block.Alloc(int(bh.Length+block.TrailerLen), bufferPool)
536 2 : readStartTime := time.Now()
537 2 : var err error
538 2 : if readHandle != nil {
539 2 : err = readHandle.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
540 2 : } else {
541 2 : err = r.readable.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
542 2 : }
543 2 : readDuration := time.Since(readStartTime)
544 2 : // TODO(sumeer): should the threshold be configurable.
545 2 : const slowReadTracingThreshold = 5 * time.Millisecond
546 2 : // For deterministic testing.
547 2 : if deterministicReadBlockDurationForTesting {
548 1 : readDuration = slowReadTracingThreshold
549 1 : }
550 : // Call IsTracingEnabled to avoid the allocations of boxing integers into an
551 : // interface{}, unless necessary.
552 2 : if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) {
553 1 : r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s",
554 1 : int(bh.Length+block.TrailerLen), readDuration.String())
555 1 : }
556 2 : if stats != nil {
557 2 : stats.BlockBytes += bh.Length
558 2 : stats.BlockReadDuration += readDuration
559 2 : }
560 2 : if err != nil {
561 1 : compressed.Release()
562 1 : return block.BufferHandle{}, err
563 1 : }
564 2 : if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.fileNum); err != nil {
565 1 : compressed.Release()
566 1 : return block.BufferHandle{}, err
567 1 : }
568 :
569 2 : typ := blockType(compressed.Get()[bh.Length])
570 2 : compressed.Truncate(int(bh.Length))
571 2 :
572 2 : var decompressed block.Value
573 2 : if typ == noCompressionBlockType {
574 2 : decompressed = compressed
575 2 : } else {
576 2 : // Decode the length of the decompressed value.
577 2 : decodedLen, prefixLen, err := decompressedLen(typ, compressed.Get())
578 2 : if err != nil {
579 0 : compressed.Release()
580 0 : return block.BufferHandle{}, err
581 0 : }
582 :
583 2 : decompressed = block.Alloc(decodedLen, bufferPool)
584 2 : if err := decompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
585 0 : compressed.Release()
586 0 : return block.BufferHandle{}, err
587 0 : }
588 2 : compressed.Release()
589 : }
590 :
591 2 : if transform != nil {
592 0 : // Transforming blocks is very rare, so the extra copy of the
593 0 : // transformed data is not problematic.
594 0 : tmpTransformed, err := transform(decompressed.Get())
595 0 : if err != nil {
596 0 : decompressed.Release()
597 0 : return block.BufferHandle{}, err
598 0 : }
599 :
600 0 : transformed := block.Alloc(len(tmpTransformed), bufferPool)
601 0 : copy(transformed.Get(), tmpTransformed)
602 0 : decompressed.Release()
603 0 : decompressed = transformed
604 : }
605 :
606 2 : if iterStats != nil {
607 2 : iterStats.reportStats(bh.Length, 0, readDuration)
608 2 : }
609 2 : h := decompressed.MakeHandle(r.opts.Cache, r.cacheID, r.fileNum, bh.Offset)
610 2 : return h, nil
611 : }
612 :
613 2 : func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.ReadHandle) error {
614 2 : // We use a BufferPool when reading metaindex blocks in order to avoid
615 2 : // populating the block cache with these blocks. In heavy-write workloads,
616 2 : // especially with high compaction concurrency, new tables may be created
617 2 : // frequently. Populating the block cache with these metaindex blocks adds
618 2 : // additional contention on the block cache mutexes (see #1997).
619 2 : // Additionally, these blocks are exceedingly unlikely to be read again
620 2 : // while they're still in the block cache except in misconfigurations with
621 2 : // excessive sstables counts or a table cache that's far too small.
622 2 : r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
623 2 : // When we're finished, release the buffers we've allocated back to memory
624 2 : // allocator. We don't expect to use metaBufferPool again.
625 2 : defer r.metaBufferPool.Release()
626 2 :
627 2 : b, err := r.readBlock(
628 2 : context.Background(), metaindexBH, nil /* transform */, readHandle, nil, /* stats */
629 2 : nil /* iterStats */, &r.metaBufferPool)
630 2 : if err != nil {
631 1 : return err
632 1 : }
633 2 : data := b.Get()
634 2 : defer b.Release()
635 2 :
636 2 : if uint64(len(data)) != metaindexBH.Length {
637 0 : return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
638 0 : errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
639 0 : }
640 :
641 2 : i, err := rowblk.NewRawIter(bytes.Compare, data)
642 2 : if err != nil {
643 0 : return err
644 0 : }
645 :
646 2 : meta := map[string]block.Handle{}
647 2 : for valid := i.First(); valid; valid = i.Next() {
648 2 : value := i.Value()
649 2 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
650 2 : vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
651 2 : if err != nil {
652 0 : return err
653 0 : }
654 2 : if n == 0 || n != len(value) {
655 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
656 0 : }
657 2 : r.valueBIH = vbih
658 2 : } else {
659 2 : bh, n := decodeBlockHandle(value)
660 2 : if n == 0 || n != len(value) {
661 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
662 0 : }
663 2 : meta[string(i.Key().UserKey)] = bh
664 : }
665 : }
666 2 : if err := i.Close(); err != nil {
667 0 : return err
668 0 : }
669 :
670 2 : if bh, ok := meta[metaPropertiesName]; ok {
671 2 : b, err = r.readBlock(
672 2 : context.Background(), bh, nil /* transform */, readHandle, nil, /* stats */
673 2 : nil /* iterStats */, nil /* buffer pool */)
674 2 : if err != nil {
675 1 : return err
676 1 : }
677 2 : r.propertiesBH = bh
678 2 : err := r.Properties.load(b.Get(), r.opts.DeniedUserProperties)
679 2 : b.Release()
680 2 : if err != nil {
681 0 : return err
682 0 : }
683 : }
684 :
685 2 : if bh, ok := meta[metaRangeDelV2Name]; ok {
686 2 : r.rangeDelBH = bh
687 2 : } else if _, ok := meta[metaRangeDelV1Name]; ok {
688 0 : // This version of Pebble requires a format major version at least as
689 0 : // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
690 0 : // this format major verison, we have a guarantee that we've compacted
691 0 : // away all RocksDB sstables. It should not be possible to encounter an
692 0 : // sstable with a v1 range deletion block but not a v2 range deletion
693 0 : // block.
694 0 : err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
695 0 : return errors.Mark(err, base.ErrCorruption)
696 0 : }
697 :
698 2 : if bh, ok := meta[metaRangeKeyName]; ok {
699 2 : r.rangeKeyBH = bh
700 2 : }
701 :
702 2 : for name, fp := range r.opts.Filters {
703 2 : types := []struct {
704 2 : ftype FilterType
705 2 : prefix string
706 2 : }{
707 2 : {TableFilter, "fullfilter."},
708 2 : }
709 2 : var done bool
710 2 : for _, t := range types {
711 2 : if bh, ok := meta[t.prefix+name]; ok {
712 2 : r.filterBH = bh
713 2 :
714 2 : switch t.ftype {
715 2 : case TableFilter:
716 2 : r.tableFilter = newTableFilterReader(fp)
717 0 : default:
718 0 : return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
719 : }
720 :
721 2 : done = true
722 2 : break
723 : }
724 : }
725 2 : if done {
726 2 : break
727 : }
728 : }
729 2 : return nil
730 : }
731 :
732 : // Layout returns the layout (block organization) for an sstable.
733 2 : func (r *Reader) Layout() (*Layout, error) {
734 2 : if r.err != nil {
735 0 : return nil, r.err
736 0 : }
737 :
738 2 : l := &Layout{
739 2 : Data: make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
740 2 : Filter: r.filterBH,
741 2 : RangeDel: r.rangeDelBH,
742 2 : RangeKey: r.rangeKeyBH,
743 2 : ValueIndex: r.valueBIH.h,
744 2 : Properties: r.propertiesBH,
745 2 : MetaIndex: r.metaIndexBH,
746 2 : Footer: r.footerBH,
747 2 : Format: r.tableFormat,
748 2 : }
749 2 :
750 2 : indexH, err := r.readIndex(context.Background(), nil, nil, nil)
751 2 : if err != nil {
752 1 : return nil, err
753 1 : }
754 2 : defer indexH.Release()
755 2 :
756 2 : var alloc bytealloc.A
757 2 :
758 2 : if r.Properties.IndexPartitions == 0 {
759 2 : l.Index = append(l.Index, r.indexBH)
760 2 : iter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
761 2 : for kv := iter.First(); kv != nil; kv = iter.Next() {
762 2 : dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
763 2 : if err != nil {
764 0 : return nil, errCorruptIndexEntry(err)
765 0 : }
766 2 : if len(dataBH.Props) > 0 {
767 2 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
768 2 : }
769 2 : l.Data = append(l.Data, dataBH)
770 : }
771 2 : } else {
772 2 : l.TopIndex = r.indexBH
773 2 : topIter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
774 2 : iter := &rowblk.Iter{}
775 2 : for kv := topIter.First(); kv != nil; kv = topIter.Next() {
776 2 : indexBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
777 2 : if err != nil {
778 0 : return nil, errCorruptIndexEntry(err)
779 0 : }
780 2 : l.Index = append(l.Index, indexBH.Handle)
781 2 :
782 2 : subIndex, err := r.readBlock(context.Background(), indexBH.Handle,
783 2 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
784 2 : if err != nil {
785 1 : return nil, err
786 1 : }
787 : // TODO(msbutler): figure out how to pass virtualState to layout call.
788 2 : if err := iter.Init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
789 0 : return nil, err
790 0 : }
791 2 : for kv := iter.First(); kv != nil; kv = iter.Next() {
792 2 : dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
793 2 : if len(dataBH.Props) > 0 {
794 2 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
795 2 : }
796 2 : if err != nil {
797 0 : return nil, errCorruptIndexEntry(err)
798 0 : }
799 2 : l.Data = append(l.Data, dataBH)
800 : }
801 2 : subIndex.Release()
802 2 : *iter = iter.ResetForReuse()
803 : }
804 : }
805 2 : if r.valueBIH.h.Length != 0 {
806 2 : vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
807 2 : if err != nil {
808 0 : return nil, err
809 0 : }
810 2 : defer vbiH.Release()
811 2 : vbiBlock := vbiH.Get()
812 2 : indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
813 2 : r.valueBIH.blockLengthByteLength)
814 2 : i := 0
815 2 : for len(vbiBlock) != 0 {
816 2 : if len(vbiBlock) < indexEntryLen {
817 0 : return nil, errors.Errorf(
818 0 : "remaining value index block %d does not contain a full entry of length %d",
819 0 : len(vbiBlock), indexEntryLen)
820 0 : }
821 2 : n := int(r.valueBIH.blockNumByteLength)
822 2 : bn := int(littleEndianGet(vbiBlock, n))
823 2 : if bn != i {
824 0 : return nil, errors.Errorf("unexpected block num %d, expected %d",
825 0 : bn, i)
826 0 : }
827 2 : i++
828 2 : vbiBlock = vbiBlock[n:]
829 2 : n = int(r.valueBIH.blockOffsetByteLength)
830 2 : blockOffset := littleEndianGet(vbiBlock, n)
831 2 : vbiBlock = vbiBlock[n:]
832 2 : n = int(r.valueBIH.blockLengthByteLength)
833 2 : blockLen := littleEndianGet(vbiBlock, n)
834 2 : vbiBlock = vbiBlock[n:]
835 2 : l.ValueBlock = append(l.ValueBlock, block.Handle{Offset: blockOffset, Length: blockLen})
836 : }
837 : }
838 :
839 2 : return l, nil
840 : }
841 :
842 : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
843 2 : func (r *Reader) ValidateBlockChecksums() error {
844 2 : // Pre-compute the BlockHandles for the underlying file.
845 2 : l, err := r.Layout()
846 2 : if err != nil {
847 1 : return err
848 1 : }
849 :
850 : // Construct the set of blocks to check. Note that the footer is not checked
851 : // as it is not a block with a checksum.
852 2 : blocks := make([]block.Handle, len(l.Data))
853 2 : for i := range l.Data {
854 2 : blocks[i] = l.Data[i].Handle
855 2 : }
856 2 : blocks = append(blocks, l.Index...)
857 2 : blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
858 2 :
859 2 : // Sorting by offset ensures we are performing a sequential scan of the
860 2 : // file.
861 2 : slices.SortFunc(blocks, func(a, b block.Handle) int {
862 2 : return cmp.Compare(a.Offset, b.Offset)
863 2 : })
864 :
865 : // Check all blocks sequentially. Make use of read-ahead, given we are
866 : // scanning the entire file from start to end.
867 2 : rh := r.readable.NewReadHandle(context.TODO(), objstorage.NoReadBefore)
868 2 : defer rh.Close()
869 2 :
870 2 : for _, bh := range blocks {
871 2 : // Certain blocks may not be present, in which case we skip them.
872 2 : if bh.Length == 0 {
873 2 : continue
874 : }
875 :
876 : // Read the block, which validates the checksum.
877 2 : h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
878 2 : if err != nil {
879 1 : return err
880 1 : }
881 2 : h.Release()
882 : }
883 :
884 2 : return nil
885 : }
886 :
887 : // CommonProperties implemented the CommonReader interface.
888 2 : func (r *Reader) CommonProperties() *CommonProperties {
889 2 : return &r.Properties.CommonProperties
890 2 : }
891 :
892 : // EstimateDiskUsage returns the total size of data blocks overlapping the range
893 : // `[start, end]`. Even if a data block partially overlaps, or we cannot
894 : // determine overlap due to abbreviated index keys, the full data block size is
895 : // included in the estimation.
896 : //
897 : // This function does not account for any metablock space usage. Assumes there
898 : // is at least partial overlap, i.e., `[start, end]` falls neither completely
899 : // before nor completely after the file's range.
900 : //
901 : // Only blocks containing point keys are considered. Range deletion and range
902 : // key blocks are not considered.
903 : //
904 : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
905 : // data blocks overlapped and add that same fraction of the metadata blocks to the
906 : // estimate.
907 2 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
908 2 : if r.err != nil {
909 0 : return 0, r.err
910 0 : }
911 :
912 2 : indexH, err := r.readIndex(context.Background(), nil, nil, nil)
913 2 : if err != nil {
914 1 : return 0, err
915 1 : }
916 2 : defer indexH.Release()
917 2 :
918 2 : // Iterators over the bottom-level index blocks containing start and end.
919 2 : // These may be different in case of partitioned index but will both point
920 2 : // to the same blockIter over the single index in the unpartitioned case.
921 2 : var startIdxIter, endIdxIter *rowblk.Iter
922 2 : if r.Properties.IndexPartitions == 0 {
923 2 : iter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
924 2 : if err != nil {
925 0 : return 0, err
926 0 : }
927 2 : startIdxIter = iter
928 2 : endIdxIter = iter
929 2 : } else {
930 2 : topIter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
931 2 : if err != nil {
932 0 : return 0, err
933 0 : }
934 :
935 2 : kv := topIter.SeekGE(start, base.SeekGEFlagsNone)
936 2 : if kv == nil {
937 1 : // The range falls completely after this file, or an error occurred.
938 1 : return 0, topIter.Error()
939 1 : }
940 2 : startIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
941 2 : if err != nil {
942 0 : return 0, errCorruptIndexEntry(err)
943 0 : }
944 2 : startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.Handle,
945 2 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
946 2 : if err != nil {
947 1 : return 0, err
948 1 : }
949 2 : defer startIdxBlock.Release()
950 2 : startIdxIter, err = rowblk.NewIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
951 2 : if err != nil {
952 0 : return 0, err
953 0 : }
954 :
955 2 : kv = topIter.SeekGE(end, base.SeekGEFlagsNone)
956 2 : if kv == nil {
957 1 : if err := topIter.Error(); err != nil {
958 0 : return 0, err
959 0 : }
960 2 : } else {
961 2 : endIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
962 2 : if err != nil {
963 0 : return 0, errCorruptIndexEntry(err)
964 0 : }
965 2 : endIdxBlock, err := r.readBlock(context.Background(),
966 2 : endIdxBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
967 2 : if err != nil {
968 1 : return 0, err
969 1 : }
970 2 : defer endIdxBlock.Release()
971 2 : endIdxIter, err = rowblk.NewIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
972 2 : if err != nil {
973 0 : return 0, err
974 0 : }
975 : }
976 : }
977 : // startIdxIter should not be nil at this point, while endIdxIter can be if the
978 : // range spans past the end of the file.
979 :
980 2 : kv := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
981 2 : if kv == nil {
982 2 : // The range falls completely after this file, or an error occurred.
983 2 : return 0, startIdxIter.Error()
984 2 : }
985 2 : startBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
986 2 : if err != nil {
987 0 : return 0, errCorruptIndexEntry(err)
988 0 : }
989 :
990 2 : includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
991 2 : // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
992 2 : // Linearly interpolate what is stored in value blocks.
993 2 : //
994 2 : // TODO(sumeer): if we need more accuracy, without loading any data blocks
995 2 : // (which contain the value handles, and which may also be insufficient if
996 2 : // the values are in separate files), we will need to accumulate the
997 2 : // logical size of the key-value pairs and store the cumulative value for
998 2 : // each data block in the index block entry. This increases the size of
999 2 : // the BlockHandle, so wait until this becomes necessary.
1000 2 : return dataBlockSize +
1001 2 : uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
1002 2 : float64(r.Properties.ValueBlocksSize))
1003 2 : }
1004 2 : if endIdxIter == nil {
1005 1 : // The range spans beyond this file. Include data blocks through the last.
1006 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1007 1 : }
1008 2 : kv = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
1009 2 : if kv == nil {
1010 2 : if err := endIdxIter.Error(); err != nil {
1011 0 : return 0, err
1012 0 : }
1013 : // The range spans beyond this file. Include data blocks through the last.
1014 2 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1015 : }
1016 2 : endBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
1017 2 : if err != nil {
1018 0 : return 0, errCorruptIndexEntry(err)
1019 0 : }
1020 2 : return includeInterpolatedValueBlocksSize(
1021 2 : endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
1022 : }
1023 :
1024 : // TableFormat returns the format version for the table.
1025 2 : func (r *Reader) TableFormat() (TableFormat, error) {
1026 2 : if r.err != nil {
1027 0 : return TableFormatUnspecified, r.err
1028 0 : }
1029 2 : return r.tableFormat, nil
1030 : }
1031 :
1032 : // NewReader returns a new table reader for the file. Closing the reader will
1033 : // close the file.
1034 2 : func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) {
1035 2 : o = o.ensureDefaults()
1036 2 : r := &Reader{
1037 2 : readable: f,
1038 2 : opts: o,
1039 2 : }
1040 2 : if r.opts.Cache == nil {
1041 2 : r.opts.Cache = cache.New(0)
1042 2 : } else {
1043 2 : r.opts.Cache.Ref()
1044 2 : }
1045 :
1046 2 : if f == nil {
1047 1 : r.err = errors.New("pebble/table: nil file")
1048 1 : return nil, r.Close()
1049 1 : }
1050 :
1051 : // Note that the extra options are applied twice. First here for pre-apply
1052 : // options, and then below for post-apply options. Pre and post refer to
1053 : // before and after reading the metaindex and properties.
1054 2 : type preApply interface{ preApply() }
1055 2 : for _, opt := range extraOpts {
1056 2 : if _, ok := opt.(preApply); ok {
1057 2 : opt.readerApply(r)
1058 2 : }
1059 : }
1060 2 : if r.cacheID == 0 {
1061 2 : r.cacheID = r.opts.Cache.NewID()
1062 2 : }
1063 :
1064 2 : var preallocRH objstorageprovider.PreallocatedReadHandle
1065 2 : ctx := context.TODO()
1066 2 : rh := objstorageprovider.UsePreallocatedReadHandle(
1067 2 : ctx, r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
1068 2 : defer rh.Close()
1069 2 :
1070 2 : footer, err := readFooter(ctx, f, rh)
1071 2 : if err != nil {
1072 1 : r.err = err
1073 1 : return nil, r.Close()
1074 1 : }
1075 2 : r.checksumType = footer.checksum
1076 2 : r.tableFormat = footer.format
1077 2 : // Read the metaindex and properties blocks.
1078 2 : if err := r.readMetaindex(footer.metaindexBH, rh); err != nil {
1079 1 : r.err = err
1080 1 : return nil, r.Close()
1081 1 : }
1082 2 : r.indexBH = footer.indexBH
1083 2 : r.metaIndexBH = footer.metaindexBH
1084 2 : r.footerBH = footer.footerBH
1085 2 :
1086 2 : if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
1087 2 : r.Compare = o.Comparer.Compare
1088 2 : r.Equal = o.Comparer.Equal
1089 2 : r.FormatKey = o.Comparer.FormatKey
1090 2 : r.Split = o.Comparer.Split
1091 2 : }
1092 :
1093 2 : if o.MergerName == r.Properties.MergerName {
1094 2 : r.mergerOK = true
1095 2 : }
1096 :
1097 : // Apply the extra options again now that the comparer and merger names are
1098 : // known.
1099 2 : for _, opt := range extraOpts {
1100 2 : if _, ok := opt.(preApply); !ok {
1101 2 : opt.readerApply(r)
1102 2 : }
1103 : }
1104 :
1105 2 : if r.Compare == nil {
1106 1 : r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
1107 1 : errors.Safe(r.fileNum), errors.Safe(r.Properties.ComparerName))
1108 1 : }
1109 2 : if !r.mergerOK {
1110 1 : if name := r.Properties.MergerName; name != "" && name != "nullptr" {
1111 1 : r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
1112 1 : errors.Safe(r.fileNum), errors.Safe(r.Properties.MergerName))
1113 1 : }
1114 : }
1115 2 : if r.err != nil {
1116 1 : return nil, r.Close()
1117 1 : }
1118 :
1119 2 : return r, nil
1120 : }
1121 :
1122 : // ReadableFile describes the smallest subset of vfs.File that is required for
1123 : // reading SSTs.
1124 : type ReadableFile interface {
1125 : io.ReaderAt
1126 : io.Closer
1127 : Stat() (os.FileInfo, error)
1128 : }
1129 :
1130 : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
1131 : // implementation (which does not support read-ahead)
1132 2 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
1133 2 : info, err := r.Stat()
1134 2 : if err != nil {
1135 1 : return nil, err
1136 1 : }
1137 2 : res := &simpleReadable{
1138 2 : f: r,
1139 2 : size: info.Size(),
1140 2 : }
1141 2 : res.rh = objstorage.MakeNoopReadHandle(res)
1142 2 : return res, nil
1143 : }
1144 :
1145 : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
1146 : type simpleReadable struct {
1147 : f ReadableFile
1148 : size int64
1149 : rh objstorage.NoopReadHandle
1150 : }
1151 :
1152 : var _ objstorage.Readable = (*simpleReadable)(nil)
1153 :
1154 : // ReadAt is part of the objstorage.Readable interface.
1155 2 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
1156 2 : n, err := s.f.ReadAt(p, off)
1157 2 : if invariants.Enabled && err == nil && n != len(p) {
1158 0 : panic("short read")
1159 : }
1160 2 : return err
1161 : }
1162 :
1163 : // Close is part of the objstorage.Readable interface.
1164 2 : func (s *simpleReadable) Close() error {
1165 2 : return s.f.Close()
1166 2 : }
1167 :
1168 : // Size is part of the objstorage.Readable interface.
1169 2 : func (s *simpleReadable) Size() int64 {
1170 2 : return s.size
1171 2 : }
1172 :
1173 : // NewReaddHandle is part of the objstorage.Readable interface.
1174 : func (s *simpleReadable) NewReadHandle(
1175 : ctx context.Context, readBeforeSize objstorage.ReadBeforeSize,
1176 2 : ) objstorage.ReadHandle {
1177 2 : return &s.rh
1178 2 : }
1179 :
1180 0 : func errCorruptIndexEntry(err error) error {
1181 0 : err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
1182 0 : if invariants.Enabled {
1183 0 : panic(err)
1184 : }
1185 0 : return err
1186 : }
|