Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "path/filepath"
14 : "runtime"
15 : "slices"
16 : "time"
17 :
18 : "github.com/cespare/xxhash/v2"
19 : "github.com/cockroachdb/crlib/crtime"
20 : "github.com/cockroachdb/crlib/fifo"
21 : "github.com/cockroachdb/errors"
22 : "github.com/cockroachdb/pebble/internal/base"
23 : "github.com/cockroachdb/pebble/internal/bytealloc"
24 : "github.com/cockroachdb/pebble/internal/cache"
25 : "github.com/cockroachdb/pebble/internal/crc"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/keyspan"
28 : "github.com/cockroachdb/pebble/internal/sstableinternal"
29 : "github.com/cockroachdb/pebble/objstorage"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
31 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
32 : "github.com/cockroachdb/pebble/sstable/block"
33 : "github.com/cockroachdb/pebble/sstable/colblk"
34 : "github.com/cockroachdb/pebble/sstable/rowblk"
35 : "github.com/cockroachdb/pebble/sstable/valblk"
36 : "github.com/cockroachdb/pebble/vfs"
37 : )
38 :
39 : var errReaderClosed = errors.New("pebble/table: reader is closed")
40 :
41 : type loadBlockResult int8
42 :
43 : const (
44 : loadBlockOK loadBlockResult = iota
45 : // Could be due to error or because no block left to load.
46 : loadBlockFailed
47 : loadBlockIrrelevant
48 : )
49 :
50 : // Reader is a table reader.
51 : type Reader struct {
52 : readable objstorage.Readable
53 :
54 : // The following fields are copied from the ReadOptions.
55 : cacheOpts sstableinternal.CacheOptions
56 : keySchema *colblk.KeySchema
57 : loadBlockSema *fifo.Semaphore
58 : deniedUserProperties map[string]struct{}
59 : filterMetricsTracker *FilterMetricsTracker
60 : logger base.LoggerAndTracer
61 :
62 : Comparer *base.Comparer
63 : Compare Compare
64 : Equal Equal
65 : Split Split
66 :
67 : tableFilter *tableFilterReader
68 :
69 : err error
70 :
71 : indexBH block.Handle
72 : filterBH block.Handle
73 : rangeDelBH block.Handle
74 : rangeKeyBH block.Handle
75 : valueBIH valblk.IndexHandle
76 : propertiesBH block.Handle
77 : metaindexBH block.Handle
78 : footerBH block.Handle
79 :
80 : Properties Properties
81 : tableFormat TableFormat
82 : checksumType block.ChecksumType
83 :
84 : // metaBufferPool is a buffer pool used exclusively when opening a table and
85 : // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
86 : // the BufferPool.pool slice as a part of the Reader allocation. It's
87 : // capacity 3 to accommodate the meta block (1), and both the compressed
88 : // properties block (1) and decompressed properties block (1)
89 : // simultaneously.
90 : metaBufferPool block.BufferPool
91 : metaBufferPoolAlloc [3]block.AllocedBuffer
92 : }
93 :
94 : var _ CommonReader = (*Reader)(nil)
95 :
96 : // Close the reader and the underlying objstorage.Readable.
97 1 : func (r *Reader) Close() error {
98 1 : r.cacheOpts.Cache.Unref()
99 1 :
100 1 : if r.readable != nil {
101 1 : r.err = firstError(r.err, r.readable.Close())
102 1 : r.readable = nil
103 1 : }
104 :
105 1 : if r.err != nil {
106 1 : return r.err
107 1 : }
108 : // Make any future calls to Get, NewIter or Close return an error.
109 1 : r.err = errReaderClosed
110 1 : return nil
111 : }
112 :
113 : // NewPointIter returns an iterator for the point keys in the table.
114 : //
115 : // If transform.HideObsoletePoints is set, the callee assumes that filterer
116 : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
117 : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
118 : func (r *Reader) NewPointIter(
119 : ctx context.Context,
120 : transforms IterTransforms,
121 : lower, upper []byte,
122 : filterer *BlockPropertiesFilterer,
123 : filterBlockSizeLimit FilterBlockSizeLimit,
124 : stats *base.InternalIteratorStats,
125 : statsAccum IterStatsAccumulator,
126 : rp valblk.ReaderProvider,
127 1 : ) (Iterator, error) {
128 1 : return r.newPointIter(
129 1 : ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
130 1 : stats, statsAccum, rp, nil)
131 1 : }
132 :
133 : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
134 : // before the call to NewPointIter, to get the value of hideObsoletePoints and
135 : // potentially add a block property filter.
136 : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
137 : snapshotForHideObsoletePoints base.SeqNum,
138 : fileLargestSeqNum base.SeqNum,
139 : pointKeyFilters []BlockPropertyFilter,
140 1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
141 1 : hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
142 1 : snapshotForHideObsoletePoints > fileLargestSeqNum
143 1 : if hideObsoletePoints {
144 1 : pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
145 1 : }
146 1 : return hideObsoletePoints, pointKeyFilters
147 : }
148 :
149 : func (r *Reader) newPointIter(
150 : ctx context.Context,
151 : transforms IterTransforms,
152 : lower, upper []byte,
153 : filterer *BlockPropertiesFilterer,
154 : filterBlockSizeLimit FilterBlockSizeLimit,
155 : stats *base.InternalIteratorStats,
156 : statsAccum IterStatsAccumulator,
157 : rp valblk.ReaderProvider,
158 : vState *virtualState,
159 1 : ) (Iterator, error) {
160 1 : // NB: pebble.fileCache wraps the returned iterator with one which performs
161 1 : // reference counting on the Reader, preventing the Reader from being closed
162 1 : // until the final iterator closes.
163 1 : var res Iterator
164 1 : var err error
165 1 : if r.Properties.IndexType == twoLevelIndex {
166 1 : if r.tableFormat.BlockColumnar() {
167 1 : res, err = newColumnBlockTwoLevelIterator(
168 1 : ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
169 1 : stats, statsAccum, rp, nil /* bufferPool */)
170 1 : } else {
171 1 : res, err = newRowBlockTwoLevelIterator(
172 1 : ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
173 1 : stats, statsAccum, rp, nil /* bufferPool */)
174 1 : }
175 1 : } else {
176 1 : if r.tableFormat.BlockColumnar() {
177 1 : res, err = newColumnBlockSingleLevelIterator(
178 1 : ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
179 1 : stats, statsAccum, rp, nil /* bufferPool */)
180 1 : } else {
181 1 : res, err = newRowBlockSingleLevelIterator(
182 1 : ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
183 1 : stats, statsAccum, rp, nil /* bufferPool */)
184 1 : }
185 : }
186 1 : if err != nil {
187 1 : // Note: we don't want to return res here - it will be a nil
188 1 : // single/twoLevelIterator, not a nil Iterator.
189 1 : return nil, err
190 1 : }
191 1 : return res, nil
192 : }
193 :
194 : // NewIter returns an iterator for the point keys in the table. It is a
195 : // simplified version of NewPointIter and should only be used for tests and
196 : // tooling.
197 : //
198 : // NewIter must only be used when the Reader is guaranteed to outlive any
199 : // LazyValues returned from the iter.
200 1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
201 1 : // TODO(radu): we should probably not use bloom filters in this case, as there
202 1 : // likely isn't a cache set up.
203 1 : return r.NewPointIter(
204 1 : context.TODO(), transforms, lower, upper, nil, AlwaysUseFilterBlock,
205 1 : nil /* stats */, nil /* statsAccum */, MakeTrivialReaderProvider(r))
206 1 : }
207 :
208 : // NewCompactionIter returns an iterator similar to NewIter but it also increments
209 : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
210 : // after itself and returns a nil iterator.
211 : func (r *Reader) NewCompactionIter(
212 : transforms IterTransforms,
213 : statsAccum IterStatsAccumulator,
214 : rp valblk.ReaderProvider,
215 : bufferPool *block.BufferPool,
216 1 : ) (Iterator, error) {
217 1 : return r.newCompactionIter(transforms, statsAccum, rp, nil, bufferPool)
218 1 : }
219 :
220 : func (r *Reader) newCompactionIter(
221 : transforms IterTransforms,
222 : statsAccum IterStatsAccumulator,
223 : rp valblk.ReaderProvider,
224 : vState *virtualState,
225 : bufferPool *block.BufferPool,
226 1 : ) (Iterator, error) {
227 1 : if vState != nil && vState.isSharedIngested {
228 1 : transforms.HideObsoletePoints = true
229 1 : }
230 1 : if r.Properties.IndexType == twoLevelIndex {
231 1 : if !r.tableFormat.BlockColumnar() {
232 1 : i, err := newRowBlockTwoLevelIterator(
233 1 : context.Background(),
234 1 : r, vState, transforms, nil /* lower */, nil /* upper */, nil,
235 1 : NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
236 1 : if err != nil {
237 0 : return nil, err
238 0 : }
239 1 : i.SetupForCompaction()
240 1 : return i, nil
241 : }
242 1 : i, err := newColumnBlockTwoLevelIterator(
243 1 : context.Background(),
244 1 : r, vState, transforms, nil /* lower */, nil /* upper */, nil,
245 1 : NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
246 1 : if err != nil {
247 0 : return nil, err
248 0 : }
249 1 : i.SetupForCompaction()
250 1 : return i, nil
251 : }
252 1 : if !r.tableFormat.BlockColumnar() {
253 1 : i, err := newRowBlockSingleLevelIterator(
254 1 : context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
255 1 : nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
256 1 : if err != nil {
257 0 : return nil, err
258 0 : }
259 1 : i.SetupForCompaction()
260 1 : return i, nil
261 : }
262 1 : i, err := newColumnBlockSingleLevelIterator(
263 1 : context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
264 1 : nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
265 1 : if err != nil {
266 0 : return nil, err
267 0 : }
268 1 : i.SetupForCompaction()
269 1 : return i, nil
270 : }
271 :
272 : // NewRawRangeDelIter returns an internal iterator for the contents of the
273 : // range-del block for the table. Returns nil if the table does not contain
274 : // any range deletions.
275 : func (r *Reader) NewRawRangeDelIter(
276 : ctx context.Context, transforms FragmentIterTransforms,
277 1 : ) (iter keyspan.FragmentIterator, err error) {
278 1 : if r.rangeDelBH.Length == 0 {
279 1 : return nil, nil
280 1 : }
281 : // TODO(radu): plumb stats here.
282 1 : h, err := r.readRangeDelBlock(ctx, noEnv, noReadHandle, r.rangeDelBH)
283 1 : if err != nil {
284 1 : return nil, err
285 1 : }
286 1 : if r.tableFormat.BlockColumnar() {
287 1 : iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
288 1 : } else {
289 1 : iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Comparer, h, transforms)
290 1 : if err != nil {
291 0 : return nil, err
292 0 : }
293 : }
294 1 : return keyspan.MaybeAssert(iter, r.Compare), nil
295 : }
296 :
297 : // NewRawRangeKeyIter returns an internal iterator for the contents of the
298 : // range-key block for the table. Returns nil if the table does not contain any
299 : // range keys.
300 : func (r *Reader) NewRawRangeKeyIter(
301 : ctx context.Context, transforms FragmentIterTransforms,
302 1 : ) (iter keyspan.FragmentIterator, err error) {
303 1 : if r.rangeKeyBH.Length == 0 {
304 1 : return nil, nil
305 1 : }
306 : // TODO(radu): plumb stats here.
307 1 : h, err := r.readRangeKeyBlock(ctx, noEnv, noReadHandle, r.rangeKeyBH)
308 1 : if err != nil {
309 1 : return nil, err
310 1 : }
311 1 : if r.tableFormat.BlockColumnar() {
312 1 : iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
313 1 : } else {
314 1 : iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Comparer, h, transforms)
315 1 : if err != nil {
316 0 : return nil, err
317 0 : }
318 : }
319 1 : return keyspan.MaybeAssert(iter, r.Compare), nil
320 : }
321 :
322 : // readBlockEnv contains arguments used when reading a block which apply to all
323 : // the block reads performed by a higher-level operation.
324 : type readBlockEnv struct {
325 : // stats and iterStats are slightly different. stats is a shared struct
326 : // supplied from the outside, and represents stats for the whole iterator
327 : // tree and can be reset from the outside (e.g. when the pebble.Iterator is
328 : // being reused). It is currently only provided when the iterator tree is
329 : // rooted at pebble.Iterator. iterStats contains an sstable iterator's
330 : // private stats that are reported to a CategoryStatsCollector when this
331 : // iterator is closed. In the important code paths, the CategoryStatsCollector
332 : // is managed by the fileCacheContainer.
333 : Stats *base.InternalIteratorStats
334 : IterStats *iterStatsAccumulator
335 :
336 : // BufferPool is not-nil if we read blocks into a buffer pool and not into the
337 : // cache. This is used during compactions.
338 : BufferPool *block.BufferPool
339 : }
340 :
341 : // BlockServedFromCache updates the stats when a block was found in the cache.
342 1 : func (env *readBlockEnv) BlockServedFromCache(blockLength uint64) {
343 1 : if env.Stats != nil {
344 1 : env.Stats.BlockBytes += blockLength
345 1 : env.Stats.BlockBytesInCache += blockLength
346 1 : }
347 1 : if env.IterStats != nil {
348 1 : env.IterStats.reportStats(blockLength, blockLength, 0)
349 1 : }
350 : }
351 :
352 : // BlockRead updates the stats when a block had to be read.
353 1 : func (env *readBlockEnv) BlockRead(blockLength uint64, readDuration time.Duration) {
354 1 : if env.Stats != nil {
355 1 : env.Stats.BlockBytes += blockLength
356 1 : env.Stats.BlockReadDuration += readDuration
357 1 : }
358 1 : if env.IterStats != nil {
359 1 : env.IterStats.reportStats(blockLength, 0, readDuration)
360 1 : }
361 : }
362 :
363 : // noEnv is the empty readBlockEnv which reports no stats and does not use a
364 : // buffer pool.
365 : var noEnv = readBlockEnv{}
366 :
367 : // noReadHandle is used when we don't want to pass a ReadHandle to one of the
368 : // read block methods.
369 : var noReadHandle objstorage.ReadHandle = nil
370 :
371 1 : var noInitBlockMetadataFn = func(*block.Metadata, []byte) error { return nil }
372 :
373 : // readMetaindexBlock reads the metaindex block.
374 : func (r *Reader) readMetaindexBlock(
375 : ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle,
376 1 : ) (block.BufferHandle, error) {
377 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
378 1 : return r.readBlockInternal(ctx, env, readHandle, r.metaindexBH, noInitBlockMetadataFn)
379 1 : }
380 :
381 : // readTopLevelIndexBlock reads the top-level index block.
382 : func (r *Reader) readTopLevelIndexBlock(
383 : ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle,
384 1 : ) (block.BufferHandle, error) {
385 1 : return r.readIndexBlock(ctx, env, readHandle, r.indexBH)
386 1 : }
387 :
388 : // readIndexBlock reads a top-level or second-level index block.
389 : func (r *Reader) readIndexBlock(
390 : ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
391 1 : ) (block.BufferHandle, error) {
392 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
393 1 : return r.readBlockInternal(ctx, env, readHandle, bh, r.initIndexBlockMetadata)
394 1 : }
395 :
396 : // initIndexBlockMetadata initializes the Metadata for a data block. This will
397 : // later be used (and reused) when reading from the block.
398 1 : func (r *Reader) initIndexBlockMetadata(metadata *block.Metadata, data []byte) error {
399 1 : if r.tableFormat.BlockColumnar() {
400 1 : return colblk.InitIndexBlockMetadata(metadata, data)
401 1 : }
402 1 : return nil
403 : }
404 :
405 : func (r *Reader) readDataBlock(
406 : ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
407 1 : ) (block.BufferHandle, error) {
408 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.DataBlock)
409 1 : return r.readBlockInternal(ctx, env, readHandle, bh, r.initDataBlockMetadata)
410 1 : }
411 :
412 : // initDataBlockMetadata initializes the Metadata for a data block. This will
413 : // later be used (and reused) when reading from the block.
414 1 : func (r *Reader) initDataBlockMetadata(metadata *block.Metadata, data []byte) error {
415 1 : if r.tableFormat.BlockColumnar() {
416 1 : return colblk.InitDataBlockMetadata(r.keySchema, metadata, data)
417 1 : }
418 1 : return nil
419 : }
420 :
421 : func (r *Reader) readFilterBlock(
422 : ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
423 1 : ) (block.BufferHandle, error) {
424 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
425 1 : return r.readBlockInternal(ctx, env, readHandle, bh, noInitBlockMetadataFn)
426 1 : }
427 :
428 : func (r *Reader) readRangeDelBlock(
429 : ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
430 1 : ) (block.BufferHandle, error) {
431 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
432 1 : return r.readBlockInternal(ctx, env, readHandle, bh, r.initKeyspanBlockMetadata)
433 1 : }
434 :
435 : func (r *Reader) readRangeKeyBlock(
436 : ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
437 1 : ) (block.BufferHandle, error) {
438 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
439 1 : return r.readBlockInternal(ctx, env, readHandle, bh, r.initKeyspanBlockMetadata)
440 1 : }
441 :
442 : // initKeyspanBlockMetadata initializes the Metadata for a rangedel or range key
443 : // block. This will later be used (and reused) when reading from the block.
444 1 : func (r *Reader) initKeyspanBlockMetadata(metadata *block.Metadata, data []byte) error {
445 1 : if r.tableFormat.BlockColumnar() {
446 1 : return colblk.InitKeyspanBlockMetadata(metadata, data)
447 1 : }
448 1 : return nil
449 : }
450 :
451 : // ReadValueBlockExternal implements valblk.ExternalBlockReader, allowing a
452 : // base.LazyValue to read a value block.
453 : func (r *Reader) ReadValueBlockExternal(
454 : ctx context.Context, bh block.Handle,
455 1 : ) (block.BufferHandle, error) {
456 1 : return r.readValueBlock(ctx, noEnv, noReadHandle, bh)
457 1 : }
458 :
459 : func (r *Reader) readValueBlock(
460 : ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
461 1 : ) (block.BufferHandle, error) {
462 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.ValueBlock)
463 1 : return r.readBlockInternal(ctx, env, readHandle, bh, noInitBlockMetadataFn)
464 1 : }
465 :
466 : func checkChecksum(
467 : checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
468 1 : ) error {
469 1 : expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
470 1 : var computedChecksum uint32
471 1 : switch checksumType {
472 1 : case block.ChecksumTypeCRC32c:
473 1 : computedChecksum = crc.New(b[:bh.Length+1]).Value()
474 1 : case block.ChecksumTypeXXHash64:
475 1 : computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
476 0 : default:
477 0 : return errors.Errorf("unsupported checksum type: %d", checksumType)
478 : }
479 :
480 1 : if expectedChecksum != computedChecksum {
481 1 : return base.CorruptionErrorf(
482 1 : "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
483 1 : fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
484 1 : }
485 1 : return nil
486 : }
487 :
488 : // DeterministicReadBlockDurationForTesting is for tests that want a
489 : // deterministic value of the time to read a block (that is not in the cache).
490 : // The return value is a function that must be called before the test exits.
491 1 : func DeterministicReadBlockDurationForTesting() func() {
492 1 : drbdForTesting := deterministicReadBlockDurationForTesting
493 1 : deterministicReadBlockDurationForTesting = true
494 1 : return func() {
495 1 : deterministicReadBlockDurationForTesting = drbdForTesting
496 1 : }
497 : }
498 :
499 : var deterministicReadBlockDurationForTesting = false
500 :
501 : // readBlockInternal should not be used directly; one of the read*Block methods
502 : // should be used instead.
503 : func (r *Reader) readBlockInternal(
504 : ctx context.Context,
505 : env readBlockEnv,
506 : readHandle objstorage.ReadHandle,
507 : bh block.Handle,
508 : initBlockMetadataFn func(*block.Metadata, []byte) error,
509 1 : ) (handle block.BufferHandle, _ error) {
510 1 : var ch cache.Handle
511 1 : var crh cache.ReadHandle
512 1 : hit := true
513 1 : if env.BufferPool == nil {
514 1 : var errorDuration time.Duration
515 1 : var err error
516 1 : ch, crh, errorDuration, hit, err = r.cacheOpts.Cache.GetWithReadHandle(
517 1 : ctx, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
518 1 : if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) {
519 0 : r.logger.Eventf(
520 0 : ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String())
521 0 : }
522 : // TODO(sumeer): consider tracing when waited longer than some duration
523 : // for turn to do the read.
524 1 : if err != nil {
525 0 : return block.BufferHandle{}, err
526 0 : }
527 1 : } else {
528 1 : // The compaction path uses env.BufferPool, and does not coordinate read
529 1 : // using a cache.ReadHandle. This is ok since only a single compaction is
530 1 : // reading a block.
531 1 : ch = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
532 1 : if ch.Valid() {
533 1 : hit = true
534 1 : }
535 : }
536 : // INVARIANT: hit => ch.Valid()
537 1 : if ch.Valid() {
538 1 : if hit {
539 1 : // Cache hit.
540 1 : if readHandle != nil {
541 1 : readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
542 1 : }
543 1 : env.BlockServedFromCache(bh.Length)
544 : }
545 1 : if invariants.Enabled && crh.Valid() {
546 0 : panic("cache.ReadHandle must not be valid")
547 : }
548 1 : return block.CacheBufferHandle(ch), nil
549 : }
550 :
551 : // Need to read. First acquire loadBlockSema, if needed.
552 1 : if sema := r.loadBlockSema; sema != nil {
553 1 : if err := sema.Acquire(ctx, 1); err != nil {
554 0 : // An error here can only come from the context.
555 0 : return block.BufferHandle{}, err
556 0 : }
557 1 : defer sema.Release(1)
558 : }
559 1 : value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn)
560 1 : if err != nil {
561 1 : if crh.Valid() {
562 1 : crh.SetReadError(err)
563 1 : }
564 1 : return block.BufferHandle{}, err
565 : }
566 1 : h := value.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
567 1 : return h, nil
568 : }
569 :
570 : // doRead is a helper for readBlockInternal that does the read, checksum
571 : // check, decompression, and returns either a block.Value or an error.
572 : func (r *Reader) doRead(
573 : ctx context.Context,
574 : env readBlockEnv,
575 : readHandle objstorage.ReadHandle,
576 : bh block.Handle,
577 : initBlockMetadataFn func(*block.Metadata, []byte) error,
578 1 : ) (block.Value, error) {
579 1 : compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool)
580 1 : readStopwatch := makeStopwatch()
581 1 : var err error
582 1 : if readHandle != nil {
583 1 : err = readHandle.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
584 1 : } else {
585 1 : err = r.readable.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
586 1 : }
587 1 : readDuration := readStopwatch.stop()
588 1 : // Call IsTracingEnabled to avoid the allocations of boxing integers into an
589 1 : // interface{}, unless necessary.
590 1 : if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
591 1 : _, file1, line1, _ := runtime.Caller(1)
592 1 : _, file2, line2, _ := runtime.Caller(2)
593 1 : r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
594 1 : int(bh.Length+block.TrailerLen), readDuration.String(),
595 1 : r.cacheOpts.FileNum,
596 1 : filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
597 1 : filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
598 1 : }
599 1 : if err != nil {
600 1 : compressed.Release()
601 1 : return block.Value{}, err
602 1 : }
603 1 : env.BlockRead(bh.Length, readDuration)
604 1 : if err = checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil {
605 1 : compressed.Release()
606 1 : return block.Value{}, err
607 1 : }
608 1 : typ := block.CompressionIndicator(compressed.BlockData()[bh.Length])
609 1 : compressed.Truncate(int(bh.Length))
610 1 : var decompressed block.Value
611 1 : if typ == block.NoCompressionIndicator {
612 1 : decompressed = compressed
613 1 : } else {
614 1 : // Decode the length of the decompressed value.
615 1 : decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData())
616 1 : if err != nil {
617 0 : compressed.Release()
618 0 : return block.Value{}, err
619 0 : }
620 1 : decompressed = block.Alloc(decodedLen, env.BufferPool)
621 1 : err = block.DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData())
622 1 : compressed.Release()
623 1 : if err != nil {
624 0 : decompressed.Release()
625 0 : return block.Value{}, err
626 0 : }
627 : }
628 1 : if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil {
629 0 : decompressed.Release()
630 0 : return block.Value{}, err
631 0 : }
632 1 : return decompressed, nil
633 : }
634 :
635 : func (r *Reader) readMetaindex(
636 : ctx context.Context, readHandle objstorage.ReadHandle, filters map[string]FilterPolicy,
637 1 : ) error {
638 1 : // We use a BufferPool when reading metaindex blocks in order to avoid
639 1 : // populating the block cache with these blocks. In heavy-write workloads,
640 1 : // especially with high compaction concurrency, new tables may be created
641 1 : // frequently. Populating the block cache with these metaindex blocks adds
642 1 : // additional contention on the block cache mutexes (see #1997).
643 1 : // Additionally, these blocks are exceedingly unlikely to be read again
644 1 : // while they're still in the block cache except in misconfigurations with
645 1 : // excessive sstables counts or a file cache that's far too small.
646 1 : r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
647 1 : // When we're finished, release the buffers we've allocated back to memory
648 1 : // allocator. We don't expect to use metaBufferPool again.
649 1 : defer r.metaBufferPool.Release()
650 1 : metaEnv := readBlockEnv{
651 1 : BufferPool: &r.metaBufferPool,
652 1 : }
653 1 :
654 1 : b, err := r.readMetaindexBlock(ctx, metaEnv, readHandle)
655 1 : if err != nil {
656 1 : return err
657 1 : }
658 1 : data := b.BlockData()
659 1 : defer b.Release()
660 1 :
661 1 : if uint64(len(data)) != r.metaindexBH.Length {
662 0 : return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
663 0 : errors.Safe(len(data)), errors.Safe(r.metaindexBH.Length))
664 0 : }
665 :
666 1 : var meta map[string]block.Handle
667 1 : meta, r.valueBIH, err = decodeMetaindex(data)
668 1 : if err != nil {
669 0 : return err
670 0 : }
671 :
672 1 : if bh, ok := meta[metaPropertiesName]; ok {
673 1 : b, err = r.readBlockInternal(ctx, metaEnv, readHandle, bh, noInitBlockMetadataFn)
674 1 : if err != nil {
675 1 : return err
676 1 : }
677 1 : r.propertiesBH = bh
678 1 : err := r.Properties.load(b.BlockData(), r.deniedUserProperties)
679 1 : b.Release()
680 1 : if err != nil {
681 0 : return err
682 0 : }
683 : }
684 :
685 1 : if bh, ok := meta[metaRangeDelV2Name]; ok {
686 1 : r.rangeDelBH = bh
687 1 : } else if _, ok := meta[metaRangeDelV1Name]; ok {
688 0 : // This version of Pebble requires a format major version at least as
689 0 : // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
690 0 : // this format major verison, we have a guarantee that we've compacted
691 0 : // away all RocksDB sstables. It should not be possible to encounter an
692 0 : // sstable with a v1 range deletion block but not a v2 range deletion
693 0 : // block.
694 0 : err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
695 0 : return errors.Mark(err, base.ErrCorruption)
696 0 : }
697 :
698 1 : if bh, ok := meta[metaRangeKeyName]; ok {
699 1 : r.rangeKeyBH = bh
700 1 : }
701 :
702 1 : for name, fp := range filters {
703 1 : if bh, ok := meta["fullfilter."+name]; ok {
704 1 : r.filterBH = bh
705 1 : r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
706 1 : break
707 : }
708 : }
709 1 : return nil
710 : }
711 :
712 : // Layout returns the layout (block organization) for an sstable.
713 1 : func (r *Reader) Layout() (*Layout, error) {
714 1 : if r.err != nil {
715 0 : return nil, r.err
716 0 : }
717 :
718 1 : l := &Layout{
719 1 : Data: make([]block.HandleWithProperties, 0, r.Properties.NumDataBlocks),
720 1 : RangeDel: r.rangeDelBH,
721 1 : RangeKey: r.rangeKeyBH,
722 1 : ValueIndex: r.valueBIH.Handle,
723 1 : Properties: r.propertiesBH,
724 1 : MetaIndex: r.metaindexBH,
725 1 : Footer: r.footerBH,
726 1 : Format: r.tableFormat,
727 1 : }
728 1 : if r.filterBH.Length > 0 {
729 1 : l.Filter = []NamedBlockHandle{{Name: "fullfilter." + r.tableFilter.policy.Name(), Handle: r.filterBH}}
730 1 : }
731 1 : ctx := context.TODO()
732 1 :
733 1 : indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, noReadHandle)
734 1 : if err != nil {
735 1 : return nil, err
736 1 : }
737 1 : defer indexH.Release()
738 1 :
739 1 : var alloc bytealloc.A
740 1 :
741 1 : if r.Properties.IndexPartitions == 0 {
742 1 : l.Index = append(l.Index, r.indexBH)
743 1 : iter := r.tableFormat.newIndexIter()
744 1 : err := iter.Init(r.Comparer, indexH.BlockData(), NoTransforms)
745 1 : if err != nil {
746 0 : return nil, errors.Wrap(err, "reading index block")
747 0 : }
748 1 : for valid := iter.First(); valid; valid = iter.Next() {
749 1 : dataBH, err := iter.BlockHandleWithProperties()
750 1 : if err != nil {
751 0 : return nil, errCorruptIndexEntry(err)
752 0 : }
753 1 : if len(dataBH.Props) > 0 {
754 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
755 1 : }
756 1 : l.Data = append(l.Data, dataBH)
757 : }
758 1 : } else {
759 1 : l.TopIndex = r.indexBH
760 1 : topIter := r.tableFormat.newIndexIter()
761 1 : err := topIter.Init(r.Comparer, indexH.BlockData(), NoTransforms)
762 1 : if err != nil {
763 0 : return nil, errors.Wrap(err, "reading index block")
764 0 : }
765 1 : iter := r.tableFormat.newIndexIter()
766 1 : for valid := topIter.First(); valid; valid = topIter.Next() {
767 1 : indexBH, err := topIter.BlockHandleWithProperties()
768 1 : if err != nil {
769 0 : return nil, errCorruptIndexEntry(err)
770 0 : }
771 1 : l.Index = append(l.Index, indexBH.Handle)
772 1 :
773 1 : subIndex, err := r.readIndexBlock(ctx, noEnv, noReadHandle, indexBH.Handle)
774 1 : if err != nil {
775 1 : return nil, err
776 1 : }
777 1 : err = func() error {
778 1 : defer subIndex.Release()
779 1 : // TODO(msbutler): figure out how to pass virtualState to layout call.
780 1 : if err := iter.Init(r.Comparer, subIndex.BlockData(), NoTransforms); err != nil {
781 0 : return err
782 0 : }
783 1 : for valid := iter.First(); valid; valid = iter.Next() {
784 1 : dataBH, err := iter.BlockHandleWithProperties()
785 1 : if err != nil {
786 0 : return errCorruptIndexEntry(err)
787 0 : }
788 1 : if len(dataBH.Props) > 0 {
789 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
790 1 : }
791 1 : l.Data = append(l.Data, dataBH)
792 : }
793 1 : return nil
794 : }()
795 1 : if err != nil {
796 0 : return nil, err
797 0 : }
798 : }
799 : }
800 1 : if r.valueBIH.Handle.Length != 0 {
801 1 : vbiH, err := r.readValueBlock(context.Background(), noEnv, noReadHandle, r.valueBIH.Handle)
802 1 : if err != nil {
803 0 : return nil, err
804 0 : }
805 1 : defer vbiH.Release()
806 1 : l.ValueBlock, err = valblk.DecodeIndex(vbiH.BlockData(), r.valueBIH)
807 1 : if err != nil {
808 0 : return nil, err
809 0 : }
810 : }
811 :
812 1 : return l, nil
813 : }
814 :
815 : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
816 1 : func (r *Reader) ValidateBlockChecksums() error {
817 1 : // Pre-compute the BlockHandles for the underlying file.
818 1 : l, err := r.Layout()
819 1 : if err != nil {
820 1 : return err
821 1 : }
822 :
823 1 : type blk struct {
824 1 : bh block.Handle
825 1 : readFn func(context.Context, readBlockEnv, objstorage.ReadHandle, block.Handle) (block.BufferHandle, error)
826 1 : }
827 1 : // Construct the set of blocks to check. Note that the footer is not checked
828 1 : // as it is not a block with a checksum.
829 1 : blocks := make([]blk, 0, len(l.Data)+6)
830 1 : for i := range l.Data {
831 1 : blocks = append(blocks, blk{
832 1 : bh: l.Data[i].Handle,
833 1 : readFn: r.readDataBlock,
834 1 : })
835 1 : }
836 1 : for _, h := range l.Index {
837 1 : blocks = append(blocks, blk{
838 1 : bh: h,
839 1 : readFn: r.readIndexBlock,
840 1 : })
841 1 : }
842 1 : blocks = append(blocks, blk{
843 1 : bh: l.TopIndex,
844 1 : readFn: r.readIndexBlock,
845 1 : })
846 1 : for _, bh := range l.Filter {
847 1 : blocks = append(blocks, blk{
848 1 : bh: bh.Handle,
849 1 : readFn: r.readFilterBlock,
850 1 : })
851 1 : }
852 1 : blocks = append(blocks, blk{
853 1 : bh: l.RangeDel,
854 1 : readFn: r.readRangeDelBlock,
855 1 : })
856 1 : blocks = append(blocks, blk{
857 1 : bh: l.RangeKey,
858 1 : readFn: r.readRangeKeyBlock,
859 1 : })
860 1 : readNoInit := func(ctx context.Context, env readBlockEnv, rh objstorage.ReadHandle, bh block.Handle) (block.BufferHandle, error) {
861 1 : return r.readBlockInternal(ctx, env, rh, bh, noInitBlockMetadataFn)
862 1 : }
863 1 : blocks = append(blocks, blk{
864 1 : bh: l.Properties,
865 1 : readFn: readNoInit,
866 1 : })
867 1 : blocks = append(blocks, blk{
868 1 : bh: l.MetaIndex,
869 1 : readFn: readNoInit,
870 1 : })
871 1 :
872 1 : // Sorting by offset ensures we are performing a sequential scan of the
873 1 : // file.
874 1 : slices.SortFunc(blocks, func(a, b blk) int {
875 1 : return cmp.Compare(a.bh.Offset, b.bh.Offset)
876 1 : })
877 :
878 1 : ctx := context.Background()
879 1 : for _, b := range blocks {
880 1 : // Certain blocks may not be present, in which case we skip them.
881 1 : if b.bh.Length == 0 {
882 1 : continue
883 : }
884 1 : h, err := b.readFn(ctx, noEnv, noReadHandle, b.bh)
885 1 : if err != nil {
886 1 : return err
887 1 : }
888 1 : h.Release()
889 : }
890 :
891 1 : return nil
892 : }
893 :
894 : // CommonProperties implemented the CommonReader interface.
895 1 : func (r *Reader) CommonProperties() *CommonProperties {
896 1 : return &r.Properties.CommonProperties
897 1 : }
898 :
899 : // EstimateDiskUsage returns the total size of data blocks overlapping the range
900 : // `[start, end]`. Even if a data block partially overlaps, or we cannot
901 : // determine overlap due to abbreviated index keys, the full data block size is
902 : // included in the estimation.
903 : //
904 : // This function does not account for any metablock space usage. Assumes there
905 : // is at least partial overlap, i.e., `[start, end]` falls neither completely
906 : // before nor completely after the file's range.
907 : //
908 : // Only blocks containing point keys are considered. Range deletion and range
909 : // key blocks are not considered.
910 : //
911 : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
912 : // data blocks overlapped and add that same fraction of the metadata blocks to the
913 : // estimate.
914 1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
915 1 : if !r.tableFormat.BlockColumnar() {
916 1 : return estimateDiskUsage[rowblk.IndexIter, *rowblk.IndexIter](r, start, end)
917 1 : }
918 1 : return estimateDiskUsage[colblk.IndexIter, *colblk.IndexIter](r, start, end)
919 : }
920 :
921 : func estimateDiskUsage[I any, PI indexBlockIterator[I]](
922 : r *Reader, start, end []byte,
923 1 : ) (uint64, error) {
924 1 : if r.err != nil {
925 0 : return 0, r.err
926 0 : }
927 1 : ctx := context.TODO()
928 1 :
929 1 : indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, noReadHandle)
930 1 : if err != nil {
931 1 : return 0, err
932 1 : }
933 : // We are using InitHandle below but we never Close those iterators, which
934 : // allows us to release the index handle ourselves.
935 : // TODO(radu): clean this up.
936 1 : defer indexH.Release()
937 1 :
938 1 : // Iterators over the bottom-level index blocks containing start and end.
939 1 : // These may be different in case of partitioned index but will both point
940 1 : // to the same blockIter over the single index in the unpartitioned case.
941 1 : var startIdxIter, endIdxIter PI
942 1 : if r.Properties.IndexPartitions == 0 {
943 1 : startIdxIter = new(I)
944 1 : if err := startIdxIter.InitHandle(r.Comparer, indexH, NoTransforms); err != nil {
945 0 : return 0, err
946 0 : }
947 1 : endIdxIter = startIdxIter
948 1 : } else {
949 1 : var topIter PI = new(I)
950 1 : if err := topIter.InitHandle(r.Comparer, indexH, NoTransforms); err != nil {
951 0 : return 0, err
952 0 : }
953 1 : if !topIter.SeekGE(start) {
954 0 : // The range falls completely after this file.
955 0 : return 0, nil
956 0 : }
957 1 : startIndexBH, err := topIter.BlockHandleWithProperties()
958 1 : if err != nil {
959 0 : return 0, errCorruptIndexEntry(err)
960 0 : }
961 1 : startIdxBlock, err := r.readIndexBlock(ctx, noEnv, noReadHandle, startIndexBH.Handle)
962 1 : if err != nil {
963 1 : return 0, err
964 1 : }
965 1 : defer startIdxBlock.Release()
966 1 : startIdxIter = new(I)
967 1 : err = startIdxIter.InitHandle(r.Comparer, startIdxBlock, NoTransforms)
968 1 : if err != nil {
969 0 : return 0, err
970 0 : }
971 :
972 1 : if topIter.SeekGE(end) {
973 1 : endIndexBH, err := topIter.BlockHandleWithProperties()
974 1 : if err != nil {
975 0 : return 0, errCorruptIndexEntry(err)
976 0 : }
977 1 : endIdxBlock, err := r.readIndexBlock(ctx, noEnv, noReadHandle, endIndexBH.Handle)
978 1 : if err != nil {
979 1 : return 0, err
980 1 : }
981 1 : defer endIdxBlock.Release()
982 1 : endIdxIter = new(I)
983 1 : err = endIdxIter.InitHandle(r.Comparer, endIdxBlock, NoTransforms)
984 1 : if err != nil {
985 0 : return 0, err
986 0 : }
987 : }
988 : }
989 : // startIdxIter should not be nil at this point, while endIdxIter can be if the
990 : // range spans past the end of the file.
991 :
992 1 : if !startIdxIter.SeekGE(start) {
993 1 : // The range falls completely after this file.
994 1 : return 0, nil
995 1 : }
996 1 : startBH, err := startIdxIter.BlockHandleWithProperties()
997 1 : if err != nil {
998 0 : return 0, errCorruptIndexEntry(err)
999 0 : }
1000 :
1001 1 : includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
1002 1 : // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
1003 1 : // Linearly interpolate what is stored in value blocks.
1004 1 : //
1005 1 : // TODO(sumeer): if we need more accuracy, without loading any data blocks
1006 1 : // (which contain the value handles, and which may also be insufficient if
1007 1 : // the values are in separate files), we will need to accumulate the
1008 1 : // logical size of the key-value pairs and store the cumulative value for
1009 1 : // each data block in the index block entry. This increases the size of
1010 1 : // the BlockHandle, so wait until this becomes necessary.
1011 1 : return dataBlockSize +
1012 1 : uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
1013 1 : float64(r.Properties.ValueBlocksSize))
1014 1 : }
1015 1 : if endIdxIter == nil {
1016 0 : // The range spans beyond this file. Include data blocks through the last.
1017 0 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1018 0 : }
1019 1 : if !endIdxIter.SeekGE(end) {
1020 1 : // The range spans beyond this file. Include data blocks through the last.
1021 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1022 1 : }
1023 1 : endBH, err := endIdxIter.BlockHandleWithProperties()
1024 1 : if err != nil {
1025 0 : return 0, errCorruptIndexEntry(err)
1026 0 : }
1027 1 : return includeInterpolatedValueBlocksSize(
1028 1 : endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
1029 : }
1030 :
1031 : // TableFormat returns the format version for the table.
1032 1 : func (r *Reader) TableFormat() (TableFormat, error) {
1033 1 : if r.err != nil {
1034 0 : return TableFormatUnspecified, r.err
1035 0 : }
1036 1 : return r.tableFormat, nil
1037 : }
1038 :
1039 : // NewReader returns a new table reader for the file. Closing the reader will
1040 : // close the file.
1041 : //
1042 : // The context is used for tracing any operations performed by NewReader; it is
1043 : // NOT stored for future use.
1044 1 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
1045 1 : if f == nil {
1046 1 : return nil, errors.New("pebble/table: nil file")
1047 1 : }
1048 1 : o = o.ensureDefaults()
1049 1 : r := &Reader{
1050 1 : readable: f,
1051 1 : cacheOpts: o.internal.CacheOpts,
1052 1 : loadBlockSema: o.LoadBlockSema,
1053 1 : deniedUserProperties: o.DeniedUserProperties,
1054 1 : filterMetricsTracker: o.FilterMetricsTracker,
1055 1 : logger: o.LoggerAndTracer,
1056 1 : }
1057 1 : if r.cacheOpts.Cache == nil {
1058 1 : r.cacheOpts.Cache = cache.New(0)
1059 1 : } else {
1060 1 : r.cacheOpts.Cache.Ref()
1061 1 : }
1062 1 : if r.cacheOpts.CacheID == 0 {
1063 1 : r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
1064 1 : }
1065 :
1066 1 : var preallocRH objstorageprovider.PreallocatedReadHandle
1067 1 : rh := objstorageprovider.UsePreallocatedReadHandle(
1068 1 : r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
1069 1 : defer rh.Close()
1070 1 :
1071 1 : footer, err := readFooter(ctx, f, rh, r.logger, r.cacheOpts.FileNum)
1072 1 : if err != nil {
1073 1 : r.err = err
1074 1 : return nil, r.Close()
1075 1 : }
1076 1 : r.checksumType = footer.checksum
1077 1 : r.tableFormat = footer.format
1078 1 : r.indexBH = footer.indexBH
1079 1 : r.metaindexBH = footer.metaindexBH
1080 1 : r.footerBH = footer.footerBH
1081 1 : // Read the metaindex and properties blocks.
1082 1 : if err := r.readMetaindex(ctx, rh, o.Filters); err != nil {
1083 1 : r.err = err
1084 1 : return nil, r.Close()
1085 1 : }
1086 :
1087 1 : if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
1088 1 : r.Comparer = o.Comparer
1089 1 : r.Compare = o.Comparer.Compare
1090 1 : r.Equal = o.Comparer.Equal
1091 1 : r.Split = o.Comparer.Split
1092 1 : } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
1093 1 : r.Comparer = o.Comparer
1094 1 : r.Compare = comparer.Compare
1095 1 : r.Equal = comparer.Equal
1096 1 : r.Split = comparer.Split
1097 1 : } else {
1098 1 : r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
1099 1 : errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
1100 1 : }
1101 :
1102 1 : if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
1103 1 : if o.Merger != nil && o.Merger.Name == mergerName {
1104 1 : // opts.Merger matches.
1105 1 : } else if _, ok := o.Mergers[mergerName]; ok {
1106 1 : // Known merger.
1107 1 : } else {
1108 1 : r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
1109 1 : errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
1110 1 : }
1111 : }
1112 :
1113 1 : if r.tableFormat.BlockColumnar() {
1114 1 : if ks, ok := o.KeySchemas[r.Properties.KeySchemaName]; ok {
1115 1 : r.keySchema = ks
1116 1 : } else {
1117 0 : var known []string
1118 0 : for name := range o.KeySchemas {
1119 0 : known = append(known, fmt.Sprintf("%q", name))
1120 0 : }
1121 0 : slices.Sort(known)
1122 0 :
1123 0 : r.err = errors.Newf("pebble/table: %d: unknown key schema %q; known key schemas: %s",
1124 0 : errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.KeySchemaName), errors.Safe(known))
1125 0 : panic(r.err)
1126 : }
1127 : }
1128 :
1129 1 : if r.err != nil {
1130 1 : return nil, r.Close()
1131 1 : }
1132 :
1133 1 : return r, nil
1134 : }
1135 :
1136 : // ReadableFile describes the smallest subset of vfs.File that is required for
1137 : // reading SSTs.
1138 : type ReadableFile interface {
1139 : io.ReaderAt
1140 : io.Closer
1141 : Stat() (vfs.FileInfo, error)
1142 : }
1143 :
1144 : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
1145 : // implementation (which does not support read-ahead)
1146 1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
1147 1 : info, err := r.Stat()
1148 1 : if err != nil {
1149 1 : return nil, err
1150 1 : }
1151 1 : res := &simpleReadable{
1152 1 : f: r,
1153 1 : size: info.Size(),
1154 1 : }
1155 1 : res.rh = objstorage.MakeNoopReadHandle(res)
1156 1 : return res, nil
1157 : }
1158 :
1159 : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
1160 : type simpleReadable struct {
1161 : f ReadableFile
1162 : size int64
1163 : rh objstorage.NoopReadHandle
1164 : }
1165 :
1166 : var _ objstorage.Readable = (*simpleReadable)(nil)
1167 :
1168 : // ReadAt is part of the objstorage.Readable interface.
1169 1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
1170 1 : n, err := s.f.ReadAt(p, off)
1171 1 : if invariants.Enabled && err == nil && n != len(p) {
1172 0 : panic("short read")
1173 : }
1174 1 : return err
1175 : }
1176 :
1177 : // Close is part of the objstorage.Readable interface.
1178 1 : func (s *simpleReadable) Close() error {
1179 1 : return s.f.Close()
1180 1 : }
1181 :
1182 : // Size is part of the objstorage.Readable interface.
1183 1 : func (s *simpleReadable) Size() int64 {
1184 1 : return s.size
1185 1 : }
1186 :
1187 : // NewReadHandle is part of the objstorage.Readable interface.
1188 : func (s *simpleReadable) NewReadHandle(
1189 : readBeforeSize objstorage.ReadBeforeSize,
1190 1 : ) objstorage.ReadHandle {
1191 1 : return &s.rh
1192 1 : }
1193 :
1194 0 : func errCorruptIndexEntry(err error) error {
1195 0 : err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
1196 0 : if invariants.Enabled {
1197 0 : panic(err)
1198 : }
1199 0 : return err
1200 : }
1201 :
1202 : type deterministicStopwatchForTesting struct {
1203 : startTime crtime.Mono
1204 : }
1205 :
1206 1 : func makeStopwatch() deterministicStopwatchForTesting {
1207 1 : return deterministicStopwatchForTesting{startTime: crtime.NowMono()}
1208 1 : }
1209 :
1210 1 : func (w deterministicStopwatchForTesting) stop() time.Duration {
1211 1 : dur := w.startTime.Elapsed()
1212 1 : if deterministicReadBlockDurationForTesting {
1213 1 : dur = slowReadTracingThreshold
1214 1 : }
1215 1 : return dur
1216 : }
1217 :
1218 : // MakeTrivialReaderProvider creates a valblk.ReaderProvider which always
1219 : // returns the given reader. It should be used when the Reader will outlive the
1220 : // iterator tree.
1221 1 : func MakeTrivialReaderProvider(r *Reader) valblk.ReaderProvider {
1222 1 : return (*trivialReaderProvider)(r)
1223 1 : }
1224 :
1225 : // trivialReaderProvider implements valblk.ReaderProvider for a Reader that will
1226 : // outlive the top-level iterator in the iterator tree.
1227 : //
1228 : // Defining the type in this manner (as opposed to a struct) avoids allocation.
1229 : type trivialReaderProvider Reader
1230 :
1231 : var _ valblk.ReaderProvider = (*trivialReaderProvider)(nil)
1232 :
1233 : // GetReader implements ReaderProvider.
1234 : func (trp *trivialReaderProvider) GetReader(
1235 : ctx context.Context,
1236 1 : ) (valblk.ExternalBlockReader, error) {
1237 1 : return (*Reader)(trp), nil
1238 1 : }
1239 :
1240 : // Close implements ReaderProvider.
1241 1 : func (trp *trivialReaderProvider) Close() {}
|