Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "io"
13 : "path/filepath"
14 : "runtime"
15 : "slices"
16 : "time"
17 :
18 : "github.com/cespare/xxhash/v2"
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/fifo"
21 : "github.com/cockroachdb/pebble/internal/base"
22 : "github.com/cockroachdb/pebble/internal/bytealloc"
23 : "github.com/cockroachdb/pebble/internal/cache"
24 : "github.com/cockroachdb/pebble/internal/crc"
25 : "github.com/cockroachdb/pebble/internal/invariants"
26 : "github.com/cockroachdb/pebble/internal/keyspan"
27 : "github.com/cockroachdb/pebble/internal/sstableinternal"
28 : "github.com/cockroachdb/pebble/objstorage"
29 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
31 : "github.com/cockroachdb/pebble/sstable/block"
32 : "github.com/cockroachdb/pebble/sstable/rowblk"
33 : "github.com/cockroachdb/pebble/vfs"
34 : )
35 :
36 : var errReaderClosed = errors.New("pebble/table: reader is closed")
37 :
38 : type loadBlockResult int8
39 :
40 : const (
41 : loadBlockOK loadBlockResult = iota
42 : // Could be due to error or because no block left to load.
43 : loadBlockFailed
44 : loadBlockIrrelevant
45 : )
46 :
47 : type blockTransform func([]byte) ([]byte, error)
48 :
49 : // Reader is a table reader.
50 : type Reader struct {
51 : readable objstorage.Readable
52 :
53 : // The following fields are copied from the ReadOptions.
54 : cacheOpts sstableinternal.CacheOptions
55 : loadBlockSema *fifo.Semaphore
56 : deniedUserProperties map[string]struct{}
57 : filterMetricsTracker *FilterMetricsTracker
58 : logger base.LoggerAndTracer
59 :
60 : Comparer *base.Comparer
61 : Compare Compare
62 : SuffixCmp CompareSuffixes
63 : Equal Equal
64 : Split Split
65 :
66 : tableFilter *tableFilterReader
67 :
68 : err error
69 :
70 : indexBH block.Handle
71 : filterBH block.Handle
72 : rangeDelBH block.Handle
73 : rangeKeyBH block.Handle
74 : valueBIH valueBlocksIndexHandle
75 : propertiesBH block.Handle
76 : metaIndexBH block.Handle
77 : footerBH block.Handle
78 :
79 : Properties Properties
80 : tableFormat TableFormat
81 : checksumType block.ChecksumType
82 :
83 : // metaBufferPool is a buffer pool used exclusively when opening a table and
84 : // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
85 : // the BufferPool.pool slice as a part of the Reader allocation. It's
86 : // capacity 3 to accommodate the meta block (1), and both the compressed
87 : // properties block (1) and decompressed properties block (1)
88 : // simultaneously.
89 : metaBufferPool block.BufferPool
90 : metaBufferPoolAlloc [3]block.AllocedBuffer
91 : }
92 :
93 : var _ CommonReader = (*Reader)(nil)
94 :
95 : // Close the reader and the underlying objstorage.Readable.
96 1 : func (r *Reader) Close() error {
97 1 : r.cacheOpts.Cache.Unref()
98 1 :
99 1 : if r.readable != nil {
100 1 : r.err = firstError(r.err, r.readable.Close())
101 1 : r.readable = nil
102 1 : }
103 :
104 1 : if r.err != nil {
105 0 : return r.err
106 0 : }
107 : // Make any future calls to Get, NewIter or Close return an error.
108 1 : r.err = errReaderClosed
109 1 : return nil
110 : }
111 :
112 : // NewPointIter returns an iterator for the point keys in the table.
113 : //
114 : // If transform.HideObsoletePoints is set, the callee assumes that filterer
115 : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
116 : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
117 : func (r *Reader) NewPointIter(
118 : ctx context.Context,
119 : transforms IterTransforms,
120 : lower, upper []byte,
121 : filterer *BlockPropertiesFilterer,
122 : filterBlockSizeLimit FilterBlockSizeLimit,
123 : stats *base.InternalIteratorStats,
124 : categoryAndQoS CategoryAndQoS,
125 : statsCollector *CategoryStatsCollector,
126 : rp ReaderProvider,
127 1 : ) (Iterator, error) {
128 1 : return r.newPointIter(
129 1 : ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
130 1 : stats, categoryAndQoS, statsCollector, rp, nil)
131 1 : }
132 :
133 : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
134 : // before the call to NewPointIter, to get the value of hideObsoletePoints and
135 : // potentially add a block property filter.
136 : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
137 : snapshotForHideObsoletePoints base.SeqNum,
138 : fileLargestSeqNum base.SeqNum,
139 : pointKeyFilters []BlockPropertyFilter,
140 1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
141 1 : hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
142 1 : snapshotForHideObsoletePoints > fileLargestSeqNum
143 1 : if hideObsoletePoints {
144 1 : pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
145 1 : }
146 1 : return hideObsoletePoints, pointKeyFilters
147 : }
148 :
149 : func (r *Reader) newPointIter(
150 : ctx context.Context,
151 : transforms IterTransforms,
152 : lower, upper []byte,
153 : filterer *BlockPropertiesFilterer,
154 : filterBlockSizeLimit FilterBlockSizeLimit,
155 : stats *base.InternalIteratorStats,
156 : categoryAndQoS CategoryAndQoS,
157 : statsCollector *CategoryStatsCollector,
158 : rp ReaderProvider,
159 : vState *virtualState,
160 1 : ) (Iterator, error) {
161 1 : // NB: pebble.tableCache wraps the returned iterator with one which performs
162 1 : // reference counting on the Reader, preventing the Reader from being closed
163 1 : // until the final iterator closes.
164 1 : var res Iterator
165 1 : var err error
166 1 : if r.Properties.IndexType == twoLevelIndex {
167 1 : res, err = newRowBlockTwoLevelIterator(
168 1 : ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
169 1 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
170 1 : } else {
171 1 : res, err = newRowBlockSingleLevelIterator(
172 1 : ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
173 1 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
174 1 : }
175 1 : if err != nil {
176 0 : // Note: we don't want to return res here - it will be a nil
177 0 : // single/twoLevelIterator, not a nil Iterator.
178 0 : return nil, err
179 0 : }
180 1 : return res, nil
181 : }
182 :
183 : // NewIter returns an iterator for the point keys in the table. It is a
184 : // simplified version of NewPointIter and should only be used for tests and
185 : // tooling.
186 : //
187 : // NewIter must only be used when the Reader is guaranteed to outlive any
188 : // LazyValues returned from the iter.
189 1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
190 1 : // TODO(radu): we should probably not use bloom filters in this case, as there
191 1 : // likely isn't a cache set up.
192 1 : return r.NewPointIter(
193 1 : context.TODO(), transforms, lower, upper, nil, AlwaysUseFilterBlock,
194 1 : nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, MakeTrivialReaderProvider(r))
195 1 : }
196 :
197 : // NewCompactionIter returns an iterator similar to NewIter but it also increments
198 : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
199 : // after itself and returns a nil iterator.
200 : func (r *Reader) NewCompactionIter(
201 : transforms IterTransforms,
202 : categoryAndQoS CategoryAndQoS,
203 : statsCollector *CategoryStatsCollector,
204 : rp ReaderProvider,
205 : bufferPool *block.BufferPool,
206 1 : ) (Iterator, error) {
207 1 : return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool)
208 1 : }
209 :
210 : func (r *Reader) newCompactionIter(
211 : transforms IterTransforms,
212 : categoryAndQoS CategoryAndQoS,
213 : statsCollector *CategoryStatsCollector,
214 : rp ReaderProvider,
215 : vState *virtualState,
216 : bufferPool *block.BufferPool,
217 1 : ) (Iterator, error) {
218 1 : if vState != nil && vState.isSharedIngested {
219 1 : transforms.HideObsoletePoints = true
220 1 : }
221 1 : if r.Properties.IndexType == twoLevelIndex {
222 1 : i, err := newRowBlockTwoLevelIterator(
223 1 : context.Background(),
224 1 : r, vState, transforms, nil /* lower */, nil /* upper */, nil,
225 1 : NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
226 1 : )
227 1 : if err != nil {
228 0 : return nil, err
229 0 : }
230 1 : i.SetupForCompaction()
231 1 : return i, nil
232 : }
233 1 : i, err := newRowBlockSingleLevelIterator(
234 1 : context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
235 1 : nil, NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
236 1 : )
237 1 : if err != nil {
238 0 : return nil, err
239 0 : }
240 1 : i.SetupForCompaction()
241 1 : return i, nil
242 : }
243 :
244 : // NewRawRangeDelIter returns an internal iterator for the contents of the
245 : // range-del block for the table. Returns nil if the table does not contain
246 : // any range deletions.
247 : func (r *Reader) NewRawRangeDelIter(
248 : ctx context.Context, transforms FragmentIterTransforms,
249 1 : ) (keyspan.FragmentIterator, error) {
250 1 : if r.rangeDelBH.Length == 0 {
251 1 : return nil, nil
252 1 : }
253 1 : h, err := r.readRangeDel(ctx, nil /* stats */, nil /* iterStats */)
254 1 : if err != nil {
255 0 : return nil, err
256 0 : }
257 1 : transforms.ElideSameSeqNum = true
258 1 : i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
259 1 : if err != nil {
260 0 : return nil, err
261 0 : }
262 1 : return keyspan.MaybeAssert(i, r.Compare), nil
263 : }
264 :
265 : // NewRawRangeKeyIter returns an internal iterator for the contents of the
266 : // range-key block for the table. Returns nil if the table does not contain any
267 : // range keys.
268 : func (r *Reader) NewRawRangeKeyIter(
269 : ctx context.Context, transforms FragmentIterTransforms,
270 1 : ) (keyspan.FragmentIterator, error) {
271 1 : if r.rangeKeyBH.Length == 0 {
272 1 : return nil, nil
273 1 : }
274 1 : h, err := r.readRangeKey(ctx, nil /* stats */, nil /* iterStats */)
275 1 : if err != nil {
276 0 : return nil, err
277 0 : }
278 1 : i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
279 1 : if err != nil {
280 0 : return nil, err
281 0 : }
282 1 : return keyspan.MaybeAssert(i, r.Compare), nil
283 : }
284 :
285 : func (r *Reader) readIndex(
286 : ctx context.Context,
287 : readHandle objstorage.ReadHandle,
288 : stats *base.InternalIteratorStats,
289 : iterStats *iterStatsAccumulator,
290 1 : ) (block.BufferHandle, error) {
291 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
292 1 : return r.readBlock(ctx, r.indexBH, nil, readHandle, stats, iterStats, nil /* buffer pool */)
293 1 : }
294 :
295 : func (r *Reader) readFilter(
296 : ctx context.Context,
297 : readHandle objstorage.ReadHandle,
298 : stats *base.InternalIteratorStats,
299 : iterStats *iterStatsAccumulator,
300 1 : ) (block.BufferHandle, error) {
301 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
302 1 : return r.readBlock(ctx, r.filterBH, nil /* transform */, readHandle, stats, iterStats, nil /* buffer pool */)
303 1 : }
304 :
305 : func (r *Reader) readRangeDel(
306 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
307 1 : ) (block.BufferHandle, error) {
308 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
309 1 : return r.readBlock(ctx, r.rangeDelBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
310 1 : }
311 :
312 : func (r *Reader) readRangeKey(
313 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
314 1 : ) (block.BufferHandle, error) {
315 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
316 1 : return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
317 1 : }
318 :
319 : func checkChecksum(
320 : checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
321 1 : ) error {
322 1 : expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
323 1 : var computedChecksum uint32
324 1 : switch checksumType {
325 1 : case block.ChecksumTypeCRC32c:
326 1 : computedChecksum = crc.New(b[:bh.Length+1]).Value()
327 0 : case block.ChecksumTypeXXHash64:
328 0 : computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
329 0 : default:
330 0 : return errors.Errorf("unsupported checksum type: %d", checksumType)
331 : }
332 :
333 1 : if expectedChecksum != computedChecksum {
334 0 : return base.CorruptionErrorf(
335 0 : "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
336 0 : fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
337 0 : }
338 1 : return nil
339 : }
340 :
341 : // DeterministicReadBlockDurationForTesting is for tests that want a
342 : // deterministic value of the time to read a block (that is not in the cache).
343 : // The return value is a function that must be called before the test exits.
344 0 : func DeterministicReadBlockDurationForTesting() func() {
345 0 : drbdForTesting := deterministicReadBlockDurationForTesting
346 0 : deterministicReadBlockDurationForTesting = true
347 0 : return func() {
348 0 : deterministicReadBlockDurationForTesting = drbdForTesting
349 0 : }
350 : }
351 :
352 : var deterministicReadBlockDurationForTesting = false
353 :
354 : func (r *Reader) readBlock(
355 : ctx context.Context,
356 : bh block.Handle,
357 : transform blockTransform,
358 : readHandle objstorage.ReadHandle,
359 : stats *base.InternalIteratorStats,
360 : iterStats *iterStatsAccumulator,
361 : bufferPool *block.BufferPool,
362 1 : ) (handle block.BufferHandle, _ error) {
363 1 : if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Get() != nil {
364 1 : // Cache hit.
365 1 : if readHandle != nil {
366 1 : readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
367 1 : }
368 1 : if stats != nil {
369 1 : stats.BlockBytes += bh.Length
370 1 : stats.BlockBytesInCache += bh.Length
371 1 : }
372 1 : if iterStats != nil {
373 1 : iterStats.reportStats(bh.Length, bh.Length, 0)
374 1 : }
375 : // This block is already in the cache; return a handle to existing vlaue
376 : // in the cache.
377 1 : return block.CacheBufferHandle(h), nil
378 : }
379 :
380 : // Cache miss.
381 :
382 1 : if sema := r.loadBlockSema; sema != nil {
383 0 : if err := sema.Acquire(ctx, 1); err != nil {
384 0 : // An error here can only come from the context.
385 0 : return block.BufferHandle{}, err
386 0 : }
387 0 : defer sema.Release(1)
388 : }
389 :
390 1 : compressed := block.Alloc(int(bh.Length+block.TrailerLen), bufferPool)
391 1 : readStopwatch := makeStopwatch()
392 1 : var err error
393 1 : if readHandle != nil {
394 1 : err = readHandle.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
395 1 : } else {
396 1 : err = r.readable.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
397 1 : }
398 1 : readDuration := readStopwatch.stop()
399 1 : // Call IsTracingEnabled to avoid the allocations of boxing integers into an
400 1 : // interface{}, unless necessary.
401 1 : if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
402 0 : _, file1, line1, _ := runtime.Caller(1)
403 0 : _, file2, line2, _ := runtime.Caller(2)
404 0 : r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
405 0 : int(bh.Length+block.TrailerLen), readDuration.String(),
406 0 : r.cacheOpts.FileNum,
407 0 : filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
408 0 : filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
409 0 : }
410 1 : if stats != nil {
411 1 : stats.BlockBytes += bh.Length
412 1 : stats.BlockReadDuration += readDuration
413 1 : }
414 1 : if err != nil {
415 0 : compressed.Release()
416 0 : return block.BufferHandle{}, err
417 0 : }
418 1 : if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.cacheOpts.FileNum); err != nil {
419 0 : compressed.Release()
420 0 : return block.BufferHandle{}, err
421 0 : }
422 :
423 1 : typ := block.CompressionIndicator(compressed.Get()[bh.Length])
424 1 : compressed.Truncate(int(bh.Length))
425 1 :
426 1 : var decompressed block.Value
427 1 : if typ == block.NoCompressionIndicator {
428 1 : decompressed = compressed
429 1 : } else {
430 1 : // Decode the length of the decompressed value.
431 1 : decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.Get())
432 1 : if err != nil {
433 0 : compressed.Release()
434 0 : return block.BufferHandle{}, err
435 0 : }
436 :
437 1 : decompressed = block.Alloc(decodedLen, bufferPool)
438 1 : if err := block.DecompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
439 0 : compressed.Release()
440 0 : return block.BufferHandle{}, err
441 0 : }
442 1 : compressed.Release()
443 : }
444 :
445 1 : if transform != nil {
446 0 : // Transforming blocks is very rare, so the extra copy of the
447 0 : // transformed data is not problematic.
448 0 : tmpTransformed, err := transform(decompressed.Get())
449 0 : if err != nil {
450 0 : decompressed.Release()
451 0 : return block.BufferHandle{}, err
452 0 : }
453 :
454 0 : transformed := block.Alloc(len(tmpTransformed), bufferPool)
455 0 : copy(transformed.Get(), tmpTransformed)
456 0 : decompressed.Release()
457 0 : decompressed = transformed
458 : }
459 :
460 1 : if iterStats != nil {
461 1 : iterStats.reportStats(bh.Length, 0, readDuration)
462 1 : }
463 1 : h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
464 1 : return h, nil
465 : }
466 :
467 : func (r *Reader) readMetaindex(
468 : ctx context.Context,
469 : metaindexBH block.Handle,
470 : readHandle objstorage.ReadHandle,
471 : filters map[string]FilterPolicy,
472 1 : ) error {
473 1 : // We use a BufferPool when reading metaindex blocks in order to avoid
474 1 : // populating the block cache with these blocks. In heavy-write workloads,
475 1 : // especially with high compaction concurrency, new tables may be created
476 1 : // frequently. Populating the block cache with these metaindex blocks adds
477 1 : // additional contention on the block cache mutexes (see #1997).
478 1 : // Additionally, these blocks are exceedingly unlikely to be read again
479 1 : // while they're still in the block cache except in misconfigurations with
480 1 : // excessive sstables counts or a table cache that's far too small.
481 1 : r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
482 1 : // When we're finished, release the buffers we've allocated back to memory
483 1 : // allocator. We don't expect to use metaBufferPool again.
484 1 : defer r.metaBufferPool.Release()
485 1 :
486 1 : b, err := r.readBlock(
487 1 : ctx, metaindexBH, nil /* transform */, readHandle, nil, /* stats */
488 1 : nil /* iterStats */, &r.metaBufferPool)
489 1 : if err != nil {
490 0 : return err
491 0 : }
492 1 : data := b.Get()
493 1 : defer b.Release()
494 1 :
495 1 : if uint64(len(data)) != metaindexBH.Length {
496 0 : return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
497 0 : errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
498 0 : }
499 :
500 1 : i, err := rowblk.NewRawIter(bytes.Compare, data)
501 1 : if err != nil {
502 0 : return err
503 0 : }
504 :
505 1 : meta := map[string]block.Handle{}
506 1 : for valid := i.First(); valid; valid = i.Next() {
507 1 : value := i.Value()
508 1 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
509 1 : vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
510 1 : if err != nil {
511 0 : return err
512 0 : }
513 1 : if n == 0 || n != len(value) {
514 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
515 0 : }
516 1 : r.valueBIH = vbih
517 1 : } else {
518 1 : bh, n := block.DecodeHandle(value)
519 1 : if n == 0 || n != len(value) {
520 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
521 0 : }
522 1 : meta[string(i.Key().UserKey)] = bh
523 : }
524 : }
525 1 : if err := i.Close(); err != nil {
526 0 : return err
527 0 : }
528 :
529 1 : if bh, ok := meta[metaPropertiesName]; ok {
530 1 : b, err = r.readBlock(
531 1 : ctx, bh, nil /* transform */, readHandle, nil, /* stats */
532 1 : nil /* iterStats */, nil /* buffer pool */)
533 1 : if err != nil {
534 0 : return err
535 0 : }
536 1 : r.propertiesBH = bh
537 1 : err := r.Properties.load(b.Get(), r.deniedUserProperties)
538 1 : b.Release()
539 1 : if err != nil {
540 0 : return err
541 0 : }
542 : }
543 :
544 1 : if bh, ok := meta[metaRangeDelV2Name]; ok {
545 1 : r.rangeDelBH = bh
546 1 : } else if _, ok := meta[metaRangeDelV1Name]; ok {
547 0 : // This version of Pebble requires a format major version at least as
548 0 : // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
549 0 : // this format major verison, we have a guarantee that we've compacted
550 0 : // away all RocksDB sstables. It should not be possible to encounter an
551 0 : // sstable with a v1 range deletion block but not a v2 range deletion
552 0 : // block.
553 0 : err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
554 0 : return errors.Mark(err, base.ErrCorruption)
555 0 : }
556 :
557 1 : if bh, ok := meta[metaRangeKeyName]; ok {
558 1 : r.rangeKeyBH = bh
559 1 : }
560 :
561 1 : for name, fp := range filters {
562 1 : types := []struct {
563 1 : ftype FilterType
564 1 : prefix string
565 1 : }{
566 1 : {TableFilter, "fullfilter."},
567 1 : }
568 1 : var done bool
569 1 : for _, t := range types {
570 1 : if bh, ok := meta[t.prefix+name]; ok {
571 1 : r.filterBH = bh
572 1 :
573 1 : switch t.ftype {
574 1 : case TableFilter:
575 1 : r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
576 0 : default:
577 0 : return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
578 : }
579 :
580 1 : done = true
581 1 : break
582 : }
583 : }
584 1 : if done {
585 1 : break
586 : }
587 : }
588 1 : return nil
589 : }
590 :
591 : // Layout returns the layout (block organization) for an sstable.
592 1 : func (r *Reader) Layout() (*Layout, error) {
593 1 : if r.err != nil {
594 0 : return nil, r.err
595 0 : }
596 :
597 1 : l := &Layout{
598 1 : Data: make([]block.HandleWithProperties, 0, r.Properties.NumDataBlocks),
599 1 : Filter: r.filterBH,
600 1 : RangeDel: r.rangeDelBH,
601 1 : RangeKey: r.rangeKeyBH,
602 1 : ValueIndex: r.valueBIH.h,
603 1 : Properties: r.propertiesBH,
604 1 : MetaIndex: r.metaIndexBH,
605 1 : Footer: r.footerBH,
606 1 : Format: r.tableFormat,
607 1 : }
608 1 :
609 1 : indexH, err := r.readIndex(context.Background(), nil, nil, nil)
610 1 : if err != nil {
611 0 : return nil, err
612 0 : }
613 1 : defer indexH.Release()
614 1 :
615 1 : var alloc bytealloc.A
616 1 :
617 1 : if r.Properties.IndexPartitions == 0 {
618 1 : l.Index = append(l.Index, r.indexBH)
619 1 : iter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
620 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
621 1 : dataBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
622 1 : if err != nil {
623 0 : return nil, errCorruptIndexEntry(err)
624 0 : }
625 1 : if len(dataBH.Props) > 0 {
626 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
627 1 : }
628 1 : l.Data = append(l.Data, dataBH)
629 : }
630 1 : } else {
631 1 : l.TopIndex = r.indexBH
632 1 : topIter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
633 1 : iter := &rowblk.Iter{}
634 1 : for kv := topIter.First(); kv != nil; kv = topIter.Next() {
635 1 : indexBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
636 1 : if err != nil {
637 0 : return nil, errCorruptIndexEntry(err)
638 0 : }
639 1 : l.Index = append(l.Index, indexBH.Handle)
640 1 :
641 1 : subIndex, err := r.readBlock(context.Background(), indexBH.Handle,
642 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
643 1 : if err != nil {
644 0 : return nil, err
645 0 : }
646 : // TODO(msbutler): figure out how to pass virtualState to layout call.
647 1 : if err := iter.Init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
648 0 : return nil, err
649 0 : }
650 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
651 1 : dataBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
652 1 : if len(dataBH.Props) > 0 {
653 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
654 1 : }
655 1 : if err != nil {
656 0 : return nil, errCorruptIndexEntry(err)
657 0 : }
658 1 : l.Data = append(l.Data, dataBH)
659 : }
660 1 : subIndex.Release()
661 1 : *iter = iter.ResetForReuse()
662 : }
663 : }
664 1 : if r.valueBIH.h.Length != 0 {
665 1 : vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
666 1 : if err != nil {
667 0 : return nil, err
668 0 : }
669 1 : defer vbiH.Release()
670 1 : vbiBlock := vbiH.Get()
671 1 : indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
672 1 : r.valueBIH.blockLengthByteLength)
673 1 : i := 0
674 1 : for len(vbiBlock) != 0 {
675 1 : if len(vbiBlock) < indexEntryLen {
676 0 : return nil, errors.Errorf(
677 0 : "remaining value index block %d does not contain a full entry of length %d",
678 0 : len(vbiBlock), indexEntryLen)
679 0 : }
680 1 : n := int(r.valueBIH.blockNumByteLength)
681 1 : bn := int(littleEndianGet(vbiBlock, n))
682 1 : if bn != i {
683 0 : return nil, errors.Errorf("unexpected block num %d, expected %d",
684 0 : bn, i)
685 0 : }
686 1 : i++
687 1 : vbiBlock = vbiBlock[n:]
688 1 : n = int(r.valueBIH.blockOffsetByteLength)
689 1 : blockOffset := littleEndianGet(vbiBlock, n)
690 1 : vbiBlock = vbiBlock[n:]
691 1 : n = int(r.valueBIH.blockLengthByteLength)
692 1 : blockLen := littleEndianGet(vbiBlock, n)
693 1 : vbiBlock = vbiBlock[n:]
694 1 : l.ValueBlock = append(l.ValueBlock, block.Handle{Offset: blockOffset, Length: blockLen})
695 : }
696 : }
697 :
698 1 : return l, nil
699 : }
700 :
701 : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
702 1 : func (r *Reader) ValidateBlockChecksums() error {
703 1 : // Pre-compute the BlockHandles for the underlying file.
704 1 : l, err := r.Layout()
705 1 : if err != nil {
706 0 : return err
707 0 : }
708 :
709 : // Construct the set of blocks to check. Note that the footer is not checked
710 : // as it is not a block with a checksum.
711 1 : blocks := make([]block.Handle, len(l.Data))
712 1 : for i := range l.Data {
713 1 : blocks[i] = l.Data[i].Handle
714 1 : }
715 1 : blocks = append(blocks, l.Index...)
716 1 : blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
717 1 :
718 1 : // Sorting by offset ensures we are performing a sequential scan of the
719 1 : // file.
720 1 : slices.SortFunc(blocks, func(a, b block.Handle) int {
721 1 : return cmp.Compare(a.Offset, b.Offset)
722 1 : })
723 :
724 : // Check all blocks sequentially. Make use of read-ahead, given we are
725 : // scanning the entire file from start to end.
726 1 : rh := r.readable.NewReadHandle(objstorage.NoReadBefore)
727 1 : defer rh.Close()
728 1 :
729 1 : for _, bh := range blocks {
730 1 : // Certain blocks may not be present, in which case we skip them.
731 1 : if bh.Length == 0 {
732 1 : continue
733 : }
734 :
735 : // Read the block, which validates the checksum.
736 1 : h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
737 1 : if err != nil {
738 0 : return err
739 0 : }
740 1 : h.Release()
741 : }
742 :
743 1 : return nil
744 : }
745 :
746 : // CommonProperties implemented the CommonReader interface.
747 1 : func (r *Reader) CommonProperties() *CommonProperties {
748 1 : return &r.Properties.CommonProperties
749 1 : }
750 :
751 : // EstimateDiskUsage returns the total size of data blocks overlapping the range
752 : // `[start, end]`. Even if a data block partially overlaps, or we cannot
753 : // determine overlap due to abbreviated index keys, the full data block size is
754 : // included in the estimation.
755 : //
756 : // This function does not account for any metablock space usage. Assumes there
757 : // is at least partial overlap, i.e., `[start, end]` falls neither completely
758 : // before nor completely after the file's range.
759 : //
760 : // Only blocks containing point keys are considered. Range deletion and range
761 : // key blocks are not considered.
762 : //
763 : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
764 : // data blocks overlapped and add that same fraction of the metadata blocks to the
765 : // estimate.
766 1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
767 1 : if r.err != nil {
768 0 : return 0, r.err
769 0 : }
770 :
771 1 : indexH, err := r.readIndex(context.Background(), nil, nil, nil)
772 1 : if err != nil {
773 0 : return 0, err
774 0 : }
775 1 : defer indexH.Release()
776 1 :
777 1 : // Iterators over the bottom-level index blocks containing start and end.
778 1 : // These may be different in case of partitioned index but will both point
779 1 : // to the same blockIter over the single index in the unpartitioned case.
780 1 : var startIdxIter, endIdxIter *rowblk.Iter
781 1 : if r.Properties.IndexPartitions == 0 {
782 1 : iter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
783 1 : if err != nil {
784 0 : return 0, err
785 0 : }
786 1 : startIdxIter = iter
787 1 : endIdxIter = iter
788 1 : } else {
789 1 : topIter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
790 1 : if err != nil {
791 0 : return 0, err
792 0 : }
793 :
794 1 : kv := topIter.SeekGE(start, base.SeekGEFlagsNone)
795 1 : if kv == nil {
796 1 : // The range falls completely after this file, or an error occurred.
797 1 : return 0, topIter.Error()
798 1 : }
799 1 : startIdxBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
800 1 : if err != nil {
801 0 : return 0, errCorruptIndexEntry(err)
802 0 : }
803 1 : startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.Handle,
804 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
805 1 : if err != nil {
806 0 : return 0, err
807 0 : }
808 1 : defer startIdxBlock.Release()
809 1 : startIdxIter, err = rowblk.NewIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
810 1 : if err != nil {
811 0 : return 0, err
812 0 : }
813 :
814 1 : kv = topIter.SeekGE(end, base.SeekGEFlagsNone)
815 1 : if kv == nil {
816 1 : if err := topIter.Error(); err != nil {
817 0 : return 0, err
818 0 : }
819 1 : } else {
820 1 : endIdxBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
821 1 : if err != nil {
822 0 : return 0, errCorruptIndexEntry(err)
823 0 : }
824 1 : endIdxBlock, err := r.readBlock(context.Background(),
825 1 : endIdxBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
826 1 : if err != nil {
827 0 : return 0, err
828 0 : }
829 1 : defer endIdxBlock.Release()
830 1 : endIdxIter, err = rowblk.NewIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
831 1 : if err != nil {
832 0 : return 0, err
833 0 : }
834 : }
835 : }
836 : // startIdxIter should not be nil at this point, while endIdxIter can be if the
837 : // range spans past the end of the file.
838 :
839 1 : kv := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
840 1 : if kv == nil {
841 1 : // The range falls completely after this file, or an error occurred.
842 1 : return 0, startIdxIter.Error()
843 1 : }
844 1 : startBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
845 1 : if err != nil {
846 0 : return 0, errCorruptIndexEntry(err)
847 0 : }
848 :
849 1 : includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
850 1 : // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
851 1 : // Linearly interpolate what is stored in value blocks.
852 1 : //
853 1 : // TODO(sumeer): if we need more accuracy, without loading any data blocks
854 1 : // (which contain the value handles, and which may also be insufficient if
855 1 : // the values are in separate files), we will need to accumulate the
856 1 : // logical size of the key-value pairs and store the cumulative value for
857 1 : // each data block in the index block entry. This increases the size of
858 1 : // the BlockHandle, so wait until this becomes necessary.
859 1 : return dataBlockSize +
860 1 : uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
861 1 : float64(r.Properties.ValueBlocksSize))
862 1 : }
863 1 : if endIdxIter == nil {
864 1 : // The range spans beyond this file. Include data blocks through the last.
865 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
866 1 : }
867 1 : kv = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
868 1 : if kv == nil {
869 1 : if err := endIdxIter.Error(); err != nil {
870 0 : return 0, err
871 0 : }
872 : // The range spans beyond this file. Include data blocks through the last.
873 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
874 : }
875 1 : endBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
876 1 : if err != nil {
877 0 : return 0, errCorruptIndexEntry(err)
878 0 : }
879 1 : return includeInterpolatedValueBlocksSize(
880 1 : endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
881 : }
882 :
883 : // TableFormat returns the format version for the table.
884 1 : func (r *Reader) TableFormat() (TableFormat, error) {
885 1 : if r.err != nil {
886 0 : return TableFormatUnspecified, r.err
887 0 : }
888 1 : return r.tableFormat, nil
889 : }
890 :
891 : // NewReader returns a new table reader for the file. Closing the reader will
892 : // close the file.
893 : //
894 : // The context is used for tracing any operations performed by NewReader; it is
895 : // NOT stored for future use.
896 1 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
897 1 : if f == nil {
898 0 : return nil, errors.New("pebble/table: nil file")
899 0 : }
900 1 : o = o.ensureDefaults()
901 1 : r := &Reader{
902 1 : readable: f,
903 1 : cacheOpts: o.internal.CacheOpts,
904 1 : loadBlockSema: o.LoadBlockSema,
905 1 : deniedUserProperties: o.DeniedUserProperties,
906 1 : filterMetricsTracker: o.FilterMetricsTracker,
907 1 : logger: o.LoggerAndTracer,
908 1 : }
909 1 : if r.cacheOpts.Cache == nil {
910 1 : r.cacheOpts.Cache = cache.New(0)
911 1 : } else {
912 1 : r.cacheOpts.Cache.Ref()
913 1 : }
914 1 : if r.cacheOpts.CacheID == 0 {
915 1 : r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
916 1 : }
917 :
918 1 : var preallocRH objstorageprovider.PreallocatedReadHandle
919 1 : rh := objstorageprovider.UsePreallocatedReadHandle(
920 1 : r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
921 1 : defer rh.Close()
922 1 :
923 1 : footer, err := readFooter(ctx, f, rh, r.logger, r.cacheOpts.FileNum)
924 1 : if err != nil {
925 0 : r.err = err
926 0 : return nil, r.Close()
927 0 : }
928 1 : r.checksumType = footer.checksum
929 1 : r.tableFormat = footer.format
930 1 : // Read the metaindex and properties blocks.
931 1 : if err := r.readMetaindex(ctx, footer.metaindexBH, rh, o.Filters); err != nil {
932 0 : r.err = err
933 0 : return nil, r.Close()
934 0 : }
935 1 : r.indexBH = footer.indexBH
936 1 : r.metaIndexBH = footer.metaindexBH
937 1 : r.footerBH = footer.footerBH
938 1 :
939 1 : if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
940 1 : r.Comparer = o.Comparer
941 1 : r.Compare = o.Comparer.Compare
942 1 : r.SuffixCmp = o.Comparer.CompareSuffixes
943 1 : r.Equal = o.Comparer.Equal
944 1 : r.Split = o.Comparer.Split
945 1 : } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
946 0 : r.Comparer = o.Comparer
947 0 : r.Compare = comparer.Compare
948 0 : r.SuffixCmp = comparer.CompareSuffixes
949 0 : r.Equal = comparer.Equal
950 0 : r.Split = comparer.Split
951 0 : } else {
952 0 : r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
953 0 : errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
954 0 : }
955 :
956 1 : if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
957 1 : if o.Merger != nil && o.Merger.Name == mergerName {
958 1 : // opts.Merger matches.
959 1 : } else if _, ok := o.Mergers[mergerName]; ok {
960 0 : // Known merger.
961 0 : } else {
962 0 : r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
963 0 : errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
964 0 : }
965 : }
966 :
967 1 : if r.err != nil {
968 0 : return nil, r.Close()
969 0 : }
970 :
971 1 : return r, nil
972 : }
973 :
974 : // ReadableFile describes the smallest subset of vfs.File that is required for
975 : // reading SSTs.
976 : type ReadableFile interface {
977 : io.ReaderAt
978 : io.Closer
979 : Stat() (vfs.FileInfo, error)
980 : }
981 :
982 : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
983 : // implementation (which does not support read-ahead)
984 1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
985 1 : info, err := r.Stat()
986 1 : if err != nil {
987 0 : return nil, err
988 0 : }
989 1 : res := &simpleReadable{
990 1 : f: r,
991 1 : size: info.Size(),
992 1 : }
993 1 : res.rh = objstorage.MakeNoopReadHandle(res)
994 1 : return res, nil
995 : }
996 :
997 : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
998 : type simpleReadable struct {
999 : f ReadableFile
1000 : size int64
1001 : rh objstorage.NoopReadHandle
1002 : }
1003 :
1004 : var _ objstorage.Readable = (*simpleReadable)(nil)
1005 :
1006 : // ReadAt is part of the objstorage.Readable interface.
1007 1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
1008 1 : n, err := s.f.ReadAt(p, off)
1009 1 : if invariants.Enabled && err == nil && n != len(p) {
1010 0 : panic("short read")
1011 : }
1012 1 : return err
1013 : }
1014 :
1015 : // Close is part of the objstorage.Readable interface.
1016 1 : func (s *simpleReadable) Close() error {
1017 1 : return s.f.Close()
1018 1 : }
1019 :
1020 : // Size is part of the objstorage.Readable interface.
1021 1 : func (s *simpleReadable) Size() int64 {
1022 1 : return s.size
1023 1 : }
1024 :
1025 : // NewReaddHandle is part of the objstorage.Readable interface.
1026 : func (s *simpleReadable) NewReadHandle(
1027 : readBeforeSize objstorage.ReadBeforeSize,
1028 1 : ) objstorage.ReadHandle {
1029 1 : return &s.rh
1030 1 : }
1031 :
1032 0 : func errCorruptIndexEntry(err error) error {
1033 0 : err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
1034 0 : if invariants.Enabled {
1035 0 : panic(err)
1036 : }
1037 0 : return err
1038 : }
1039 :
1040 : type deterministicStopwatchForTesting struct {
1041 : startTime time.Time
1042 : }
1043 :
1044 1 : func makeStopwatch() deterministicStopwatchForTesting {
1045 1 : return deterministicStopwatchForTesting{startTime: time.Now()}
1046 1 : }
1047 :
1048 1 : func (w deterministicStopwatchForTesting) stop() time.Duration {
1049 1 : dur := time.Since(w.startTime)
1050 1 : if deterministicReadBlockDurationForTesting {
1051 0 : dur = slowReadTracingThreshold
1052 0 : }
1053 1 : return dur
1054 : }
|