Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "io"
13 : "os"
14 : "path/filepath"
15 : "runtime"
16 : "slices"
17 : "time"
18 :
19 : "github.com/cespare/xxhash/v2"
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/fifo"
22 : "github.com/cockroachdb/pebble/internal/base"
23 : "github.com/cockroachdb/pebble/internal/bytealloc"
24 : "github.com/cockroachdb/pebble/internal/cache"
25 : "github.com/cockroachdb/pebble/internal/crc"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/keyspan"
28 : "github.com/cockroachdb/pebble/internal/sstableinternal"
29 : "github.com/cockroachdb/pebble/objstorage"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
31 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
32 : "github.com/cockroachdb/pebble/sstable/block"
33 : "github.com/cockroachdb/pebble/sstable/rowblk"
34 : )
35 :
36 : var errReaderClosed = errors.New("pebble/table: reader is closed")
37 :
38 : // decodeBlockHandle returns the block handle encoded at the start of src, as
39 : // well as the number of bytes it occupies. It returns zero if given invalid
40 : // input. A block handle for a data block or a first/lower level index block
41 : // should not be decoded using decodeBlockHandle since the caller may validate
42 : // that the number of bytes decoded is equal to the length of src, which will
43 : // be false if the properties are not decoded. In those cases the caller
44 : // should use decodeBlockHandleWithProperties.
45 1 : func decodeBlockHandle(src []byte) (block.Handle, int) {
46 1 : offset, n := binary.Uvarint(src)
47 1 : length, m := binary.Uvarint(src[n:])
48 1 : if n == 0 || m == 0 {
49 0 : return block.Handle{}, 0
50 0 : }
51 1 : return block.Handle{Offset: offset, Length: length}, n + m
52 : }
53 :
54 : // decodeBlockHandleWithProperties returns the block handle and properties
55 : // encoded in src. src needs to be exactly the length that was encoded. This
56 : // method must be used for data block and first/lower level index blocks. The
57 : // properties in the block handle point to the bytes in src.
58 1 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
59 1 : bh, n := decodeBlockHandle(src)
60 1 : if n == 0 {
61 0 : return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
62 0 : }
63 1 : return BlockHandleWithProperties{
64 1 : Handle: bh,
65 1 : Props: src[n:],
66 1 : }, nil
67 : }
68 :
69 1 : func encodeBlockHandle(dst []byte, b block.Handle) int {
70 1 : n := binary.PutUvarint(dst, b.Offset)
71 1 : m := binary.PutUvarint(dst[n:], b.Length)
72 1 : return n + m
73 1 : }
74 :
75 1 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
76 1 : n := encodeBlockHandle(dst, b.Handle)
77 1 : dst = append(dst[:n], b.Props...)
78 1 : return dst
79 1 : }
80 :
81 : type loadBlockResult int8
82 :
83 : const (
84 : loadBlockOK loadBlockResult = iota
85 : // Could be due to error or because no block left to load.
86 : loadBlockFailed
87 : loadBlockIrrelevant
88 : )
89 :
90 : type blockTransform func([]byte) ([]byte, error)
91 :
92 : // Reader is a table reader.
93 : type Reader struct {
94 : readable objstorage.Readable
95 :
96 : // The following fields are copied from the ReadOptions.
97 : cacheOpts sstableinternal.CacheOptions
98 : loadBlockSema *fifo.Semaphore
99 : deniedUserProperties map[string]struct{}
100 : filterMetricsTracker *FilterMetricsTracker
101 : logger base.LoggerAndTracer
102 :
103 : Compare Compare
104 : Equal Equal
105 : FormatKey base.FormatKey
106 : Split Split
107 :
108 : tableFilter *tableFilterReader
109 :
110 : err error
111 :
112 : indexBH block.Handle
113 : filterBH block.Handle
114 : rangeDelBH block.Handle
115 : rangeKeyBH block.Handle
116 : valueBIH valueBlocksIndexHandle
117 : propertiesBH block.Handle
118 : metaIndexBH block.Handle
119 : footerBH block.Handle
120 :
121 : Properties Properties
122 : tableFormat TableFormat
123 : checksumType block.ChecksumType
124 :
125 : // metaBufferPool is a buffer pool used exclusively when opening a table and
126 : // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
127 : // the BufferPool.pool slice as a part of the Reader allocation. It's
128 : // capacity 3 to accommodate the meta block (1), and both the compressed
129 : // properties block (1) and decompressed properties block (1)
130 : // simultaneously.
131 : metaBufferPool block.BufferPool
132 : metaBufferPoolAlloc [3]block.AllocedBuffer
133 : }
134 :
135 : var _ CommonReader = (*Reader)(nil)
136 :
137 : // Close the reader and the underlying objstorage.Readable.
138 1 : func (r *Reader) Close() error {
139 1 : r.cacheOpts.Cache.Unref()
140 1 :
141 1 : if r.readable != nil {
142 1 : r.err = firstError(r.err, r.readable.Close())
143 1 : r.readable = nil
144 1 : }
145 :
146 1 : if r.err != nil {
147 0 : return r.err
148 0 : }
149 : // Make any future calls to Get, NewIter or Close return an error.
150 1 : r.err = errReaderClosed
151 1 : return nil
152 : }
153 :
154 : // NewIterWithBlockPropertyFilters returns an iterator for the contents of the
155 : // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after
156 : // itself and returns a nil iterator.
157 : func (r *Reader) NewIterWithBlockPropertyFilters(
158 : transforms IterTransforms,
159 : lower, upper []byte,
160 : filterer *BlockPropertiesFilterer,
161 : useFilterBlock bool,
162 : stats *base.InternalIteratorStats,
163 : categoryAndQoS CategoryAndQoS,
164 : statsCollector *CategoryStatsCollector,
165 : rp ReaderProvider,
166 1 : ) (Iterator, error) {
167 1 : return r.newIterWithBlockPropertyFiltersAndContext(
168 1 : context.Background(), transforms, lower, upper, filterer, useFilterBlock,
169 1 : stats, categoryAndQoS, statsCollector, rp, nil)
170 1 : }
171 :
172 : // NewIterWithBlockPropertyFiltersAndContextEtc is similar to
173 : // NewIterWithBlockPropertyFilters and additionally accepts a context for
174 : // tracing.
175 : //
176 : // If transform.HideObsoletePoints is set, the callee assumes that filterer
177 : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
178 : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
179 : func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc(
180 : ctx context.Context,
181 : transforms IterTransforms,
182 : lower, upper []byte,
183 : filterer *BlockPropertiesFilterer,
184 : useFilterBlock bool,
185 : stats *base.InternalIteratorStats,
186 : categoryAndQoS CategoryAndQoS,
187 : statsCollector *CategoryStatsCollector,
188 : rp ReaderProvider,
189 1 : ) (Iterator, error) {
190 1 : return r.newIterWithBlockPropertyFiltersAndContext(
191 1 : ctx, transforms, lower, upper, filterer, useFilterBlock,
192 1 : stats, categoryAndQoS, statsCollector, rp, nil)
193 1 : }
194 :
195 : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
196 : // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the
197 : // value of hideObsoletePoints and potentially add a block property filter.
198 : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
199 : snapshotForHideObsoletePoints base.SeqNum,
200 : fileLargestSeqNum base.SeqNum,
201 : pointKeyFilters []BlockPropertyFilter,
202 1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
203 1 : hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
204 1 : snapshotForHideObsoletePoints > fileLargestSeqNum
205 1 : if hideObsoletePoints {
206 1 : pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
207 1 : }
208 1 : return hideObsoletePoints, pointKeyFilters
209 : }
210 :
211 : func (r *Reader) newIterWithBlockPropertyFiltersAndContext(
212 : ctx context.Context,
213 : transforms IterTransforms,
214 : lower, upper []byte,
215 : filterer *BlockPropertiesFilterer,
216 : useFilterBlock bool,
217 : stats *base.InternalIteratorStats,
218 : categoryAndQoS CategoryAndQoS,
219 : statsCollector *CategoryStatsCollector,
220 : rp ReaderProvider,
221 : vState *virtualState,
222 1 : ) (Iterator, error) {
223 1 : // NB: pebble.tableCache wraps the returned iterator with one which performs
224 1 : // reference counting on the Reader, preventing the Reader from being closed
225 1 : // until the final iterator closes.
226 1 : var res Iterator
227 1 : var err error
228 1 : if r.Properties.IndexType == twoLevelIndex {
229 1 : res, err = newTwoLevelIterator(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
230 1 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
231 1 : } else {
232 1 : res, err = newSingleLevelIterator(
233 1 : ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
234 1 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
235 1 : }
236 1 : if err != nil {
237 0 : // Note: we don't want to return res here - it will be a nil
238 0 : // single/twoLevelIterator, not a nil Iterator.
239 0 : return nil, err
240 0 : }
241 1 : return res, nil
242 : }
243 :
244 : // NewIter returns an iterator for the contents of the table. If an error
245 : // occurs, NewIter cleans up after itself and returns a nil iterator. NewIter
246 : // must only be used when the Reader is guaranteed to outlive any LazyValues
247 : // returned from the iter.
248 1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
249 1 : return r.NewIterWithBlockPropertyFilters(
250 1 : transforms, lower, upper, nil, true, /* useFilterBlock */
251 1 : nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, TrivialReaderProvider{Reader: r})
252 1 : }
253 :
254 : // NewCompactionIter returns an iterator similar to NewIter but it also increments
255 : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
256 : // after itself and returns a nil iterator.
257 : func (r *Reader) NewCompactionIter(
258 : transforms IterTransforms,
259 : categoryAndQoS CategoryAndQoS,
260 : statsCollector *CategoryStatsCollector,
261 : rp ReaderProvider,
262 : bufferPool *block.BufferPool,
263 1 : ) (Iterator, error) {
264 1 : return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool)
265 1 : }
266 :
267 : func (r *Reader) newCompactionIter(
268 : transforms IterTransforms,
269 : categoryAndQoS CategoryAndQoS,
270 : statsCollector *CategoryStatsCollector,
271 : rp ReaderProvider,
272 : vState *virtualState,
273 : bufferPool *block.BufferPool,
274 1 : ) (Iterator, error) {
275 1 : if vState != nil && vState.isSharedIngested {
276 1 : transforms.HideObsoletePoints = true
277 1 : }
278 1 : if r.Properties.IndexType == twoLevelIndex {
279 1 : i, err := newTwoLevelIterator(
280 1 : context.Background(),
281 1 : r, vState, transforms, nil /* lower */, nil /* upper */, nil,
282 1 : false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
283 1 : )
284 1 : if err != nil {
285 0 : return nil, err
286 0 : }
287 1 : i.setupForCompaction()
288 1 : return &twoLevelCompactionIterator{twoLevelIterator: i}, nil
289 : }
290 1 : i, err := newSingleLevelIterator(
291 1 : context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
292 1 : nil, false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
293 1 : )
294 1 : if err != nil {
295 0 : return nil, err
296 0 : }
297 1 : i.setupForCompaction()
298 1 : return &compactionIterator{singleLevelIterator: i}, nil
299 : }
300 :
301 : // NewRawRangeDelIter returns an internal iterator for the contents of the
302 : // range-del block for the table. Returns nil if the table does not contain
303 : // any range deletions.
304 : func (r *Reader) NewRawRangeDelIter(
305 : ctx context.Context, transforms FragmentIterTransforms,
306 1 : ) (keyspan.FragmentIterator, error) {
307 1 : if r.rangeDelBH.Length == 0 {
308 1 : return nil, nil
309 1 : }
310 1 : h, err := r.readRangeDel(ctx, nil /* stats */, nil /* iterStats */)
311 1 : if err != nil {
312 0 : return nil, err
313 0 : }
314 1 : transforms.ElideSameSeqNum = true
315 1 : i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Split, h, transforms)
316 1 : if err != nil {
317 0 : return nil, err
318 0 : }
319 1 : return keyspan.MaybeAssert(i, r.Compare), nil
320 : }
321 :
322 : // NewRawRangeKeyIter returns an internal iterator for the contents of the
323 : // range-key block for the table. Returns nil if the table does not contain any
324 : // range keys.
325 : func (r *Reader) NewRawRangeKeyIter(
326 : ctx context.Context, transforms FragmentIterTransforms,
327 1 : ) (keyspan.FragmentIterator, error) {
328 1 : if r.rangeKeyBH.Length == 0 {
329 1 : return nil, nil
330 1 : }
331 1 : h, err := r.readRangeKey(ctx, nil /* stats */, nil /* iterStats */)
332 1 : if err != nil {
333 0 : return nil, err
334 0 : }
335 1 : i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Split, h, transforms)
336 1 : if err != nil {
337 0 : return nil, err
338 0 : }
339 1 : return keyspan.MaybeAssert(i, r.Compare), nil
340 : }
341 :
342 : func (r *Reader) readIndex(
343 : ctx context.Context,
344 : readHandle objstorage.ReadHandle,
345 : stats *base.InternalIteratorStats,
346 : iterStats *iterStatsAccumulator,
347 1 : ) (block.BufferHandle, error) {
348 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
349 1 : return r.readBlock(ctx, r.indexBH, nil, readHandle, stats, iterStats, nil /* buffer pool */)
350 1 : }
351 :
352 : func (r *Reader) readFilter(
353 : ctx context.Context,
354 : readHandle objstorage.ReadHandle,
355 : stats *base.InternalIteratorStats,
356 : iterStats *iterStatsAccumulator,
357 1 : ) (block.BufferHandle, error) {
358 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
359 1 : return r.readBlock(ctx, r.filterBH, nil /* transform */, readHandle, stats, iterStats, nil /* buffer pool */)
360 1 : }
361 :
362 : func (r *Reader) readRangeDel(
363 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
364 1 : ) (block.BufferHandle, error) {
365 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
366 1 : return r.readBlock(ctx, r.rangeDelBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
367 1 : }
368 :
369 : func (r *Reader) readRangeKey(
370 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
371 1 : ) (block.BufferHandle, error) {
372 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
373 1 : return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
374 1 : }
375 :
376 : func checkChecksum(
377 : checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
378 1 : ) error {
379 1 : expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
380 1 : var computedChecksum uint32
381 1 : switch checksumType {
382 1 : case block.ChecksumTypeCRC32c:
383 1 : computedChecksum = crc.New(b[:bh.Length+1]).Value()
384 0 : case block.ChecksumTypeXXHash64:
385 0 : computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
386 0 : default:
387 0 : return errors.Errorf("unsupported checksum type: %d", checksumType)
388 : }
389 :
390 1 : if expectedChecksum != computedChecksum {
391 0 : return base.CorruptionErrorf(
392 0 : "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
393 0 : fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
394 0 : }
395 1 : return nil
396 : }
397 :
398 : // DeterministicReadBlockDurationForTesting is for tests that want a
399 : // deterministic value of the time to read a block (that is not in the cache).
400 : // The return value is a function that must be called before the test exits.
401 0 : func DeterministicReadBlockDurationForTesting() func() {
402 0 : drbdForTesting := deterministicReadBlockDurationForTesting
403 0 : deterministicReadBlockDurationForTesting = true
404 0 : return func() {
405 0 : deterministicReadBlockDurationForTesting = drbdForTesting
406 0 : }
407 : }
408 :
409 : var deterministicReadBlockDurationForTesting = false
410 :
411 : func (r *Reader) readBlock(
412 : ctx context.Context,
413 : bh block.Handle,
414 : transform blockTransform,
415 : readHandle objstorage.ReadHandle,
416 : stats *base.InternalIteratorStats,
417 : iterStats *iterStatsAccumulator,
418 : bufferPool *block.BufferPool,
419 1 : ) (handle block.BufferHandle, _ error) {
420 1 : if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Get() != nil {
421 1 : // Cache hit.
422 1 : if readHandle != nil {
423 1 : readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
424 1 : }
425 1 : if stats != nil {
426 1 : stats.BlockBytes += bh.Length
427 1 : stats.BlockBytesInCache += bh.Length
428 1 : }
429 1 : if iterStats != nil {
430 1 : iterStats.reportStats(bh.Length, bh.Length, 0)
431 1 : }
432 : // This block is already in the cache; return a handle to existing vlaue
433 : // in the cache.
434 1 : return block.CacheBufferHandle(h), nil
435 : }
436 :
437 : // Cache miss.
438 :
439 1 : if sema := r.loadBlockSema; sema != nil {
440 0 : if err := sema.Acquire(ctx, 1); err != nil {
441 0 : // An error here can only come from the context.
442 0 : return block.BufferHandle{}, err
443 0 : }
444 0 : defer sema.Release(1)
445 : }
446 :
447 1 : compressed := block.Alloc(int(bh.Length+block.TrailerLen), bufferPool)
448 1 : readStopwatch := makeStopwatch()
449 1 : var err error
450 1 : if readHandle != nil {
451 1 : err = readHandle.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
452 1 : } else {
453 1 : err = r.readable.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
454 1 : }
455 1 : readDuration := readStopwatch.stop()
456 1 : // Call IsTracingEnabled to avoid the allocations of boxing integers into an
457 1 : // interface{}, unless necessary.
458 1 : if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
459 0 : _, file1, line1, _ := runtime.Caller(1)
460 0 : _, file2, line2, _ := runtime.Caller(2)
461 0 : r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
462 0 : int(bh.Length+block.TrailerLen), readDuration.String(),
463 0 : r.cacheOpts.FileNum,
464 0 : filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
465 0 : filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
466 0 : }
467 1 : if stats != nil {
468 1 : stats.BlockBytes += bh.Length
469 1 : stats.BlockReadDuration += readDuration
470 1 : }
471 1 : if err != nil {
472 0 : compressed.Release()
473 0 : return block.BufferHandle{}, err
474 0 : }
475 1 : if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.cacheOpts.FileNum); err != nil {
476 0 : compressed.Release()
477 0 : return block.BufferHandle{}, err
478 0 : }
479 :
480 1 : typ := blockType(compressed.Get()[bh.Length])
481 1 : compressed.Truncate(int(bh.Length))
482 1 :
483 1 : var decompressed block.Value
484 1 : if typ == noCompressionBlockType {
485 1 : decompressed = compressed
486 1 : } else {
487 1 : // Decode the length of the decompressed value.
488 1 : decodedLen, prefixLen, err := decompressedLen(typ, compressed.Get())
489 1 : if err != nil {
490 0 : compressed.Release()
491 0 : return block.BufferHandle{}, err
492 0 : }
493 :
494 1 : decompressed = block.Alloc(decodedLen, bufferPool)
495 1 : if err := decompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
496 0 : compressed.Release()
497 0 : return block.BufferHandle{}, err
498 0 : }
499 1 : compressed.Release()
500 : }
501 :
502 1 : if transform != nil {
503 0 : // Transforming blocks is very rare, so the extra copy of the
504 0 : // transformed data is not problematic.
505 0 : tmpTransformed, err := transform(decompressed.Get())
506 0 : if err != nil {
507 0 : decompressed.Release()
508 0 : return block.BufferHandle{}, err
509 0 : }
510 :
511 0 : transformed := block.Alloc(len(tmpTransformed), bufferPool)
512 0 : copy(transformed.Get(), tmpTransformed)
513 0 : decompressed.Release()
514 0 : decompressed = transformed
515 : }
516 :
517 1 : if iterStats != nil {
518 1 : iterStats.reportStats(bh.Length, 0, readDuration)
519 1 : }
520 1 : h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
521 1 : return h, nil
522 : }
523 :
524 : func (r *Reader) readMetaindex(
525 : ctx context.Context,
526 : metaindexBH block.Handle,
527 : readHandle objstorage.ReadHandle,
528 : filters map[string]FilterPolicy,
529 1 : ) error {
530 1 : // We use a BufferPool when reading metaindex blocks in order to avoid
531 1 : // populating the block cache with these blocks. In heavy-write workloads,
532 1 : // especially with high compaction concurrency, new tables may be created
533 1 : // frequently. Populating the block cache with these metaindex blocks adds
534 1 : // additional contention on the block cache mutexes (see #1997).
535 1 : // Additionally, these blocks are exceedingly unlikely to be read again
536 1 : // while they're still in the block cache except in misconfigurations with
537 1 : // excessive sstables counts or a table cache that's far too small.
538 1 : r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
539 1 : // When we're finished, release the buffers we've allocated back to memory
540 1 : // allocator. We don't expect to use metaBufferPool again.
541 1 : defer r.metaBufferPool.Release()
542 1 :
543 1 : b, err := r.readBlock(
544 1 : ctx, metaindexBH, nil /* transform */, readHandle, nil, /* stats */
545 1 : nil /* iterStats */, &r.metaBufferPool)
546 1 : if err != nil {
547 0 : return err
548 0 : }
549 1 : data := b.Get()
550 1 : defer b.Release()
551 1 :
552 1 : if uint64(len(data)) != metaindexBH.Length {
553 0 : return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
554 0 : errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
555 0 : }
556 :
557 1 : i, err := rowblk.NewRawIter(bytes.Compare, data)
558 1 : if err != nil {
559 0 : return err
560 0 : }
561 :
562 1 : meta := map[string]block.Handle{}
563 1 : for valid := i.First(); valid; valid = i.Next() {
564 1 : value := i.Value()
565 1 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
566 1 : vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
567 1 : if err != nil {
568 0 : return err
569 0 : }
570 1 : if n == 0 || n != len(value) {
571 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
572 0 : }
573 1 : r.valueBIH = vbih
574 1 : } else {
575 1 : bh, n := decodeBlockHandle(value)
576 1 : if n == 0 || n != len(value) {
577 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
578 0 : }
579 1 : meta[string(i.Key().UserKey)] = bh
580 : }
581 : }
582 1 : if err := i.Close(); err != nil {
583 0 : return err
584 0 : }
585 :
586 1 : if bh, ok := meta[metaPropertiesName]; ok {
587 1 : b, err = r.readBlock(
588 1 : ctx, bh, nil /* transform */, readHandle, nil, /* stats */
589 1 : nil /* iterStats */, nil /* buffer pool */)
590 1 : if err != nil {
591 0 : return err
592 0 : }
593 1 : r.propertiesBH = bh
594 1 : err := r.Properties.load(b.Get(), r.deniedUserProperties)
595 1 : b.Release()
596 1 : if err != nil {
597 0 : return err
598 0 : }
599 : }
600 :
601 1 : if bh, ok := meta[metaRangeDelV2Name]; ok {
602 1 : r.rangeDelBH = bh
603 1 : } else if _, ok := meta[metaRangeDelV1Name]; ok {
604 0 : // This version of Pebble requires a format major version at least as
605 0 : // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
606 0 : // this format major verison, we have a guarantee that we've compacted
607 0 : // away all RocksDB sstables. It should not be possible to encounter an
608 0 : // sstable with a v1 range deletion block but not a v2 range deletion
609 0 : // block.
610 0 : err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
611 0 : return errors.Mark(err, base.ErrCorruption)
612 0 : }
613 :
614 1 : if bh, ok := meta[metaRangeKeyName]; ok {
615 1 : r.rangeKeyBH = bh
616 1 : }
617 :
618 1 : for name, fp := range filters {
619 1 : types := []struct {
620 1 : ftype FilterType
621 1 : prefix string
622 1 : }{
623 1 : {TableFilter, "fullfilter."},
624 1 : }
625 1 : var done bool
626 1 : for _, t := range types {
627 1 : if bh, ok := meta[t.prefix+name]; ok {
628 1 : r.filterBH = bh
629 1 :
630 1 : switch t.ftype {
631 1 : case TableFilter:
632 1 : r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
633 0 : default:
634 0 : return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
635 : }
636 :
637 1 : done = true
638 1 : break
639 : }
640 : }
641 1 : if done {
642 1 : break
643 : }
644 : }
645 1 : return nil
646 : }
647 :
648 : // Layout returns the layout (block organization) for an sstable.
649 1 : func (r *Reader) Layout() (*Layout, error) {
650 1 : if r.err != nil {
651 0 : return nil, r.err
652 0 : }
653 :
654 1 : l := &Layout{
655 1 : Data: make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
656 1 : Filter: r.filterBH,
657 1 : RangeDel: r.rangeDelBH,
658 1 : RangeKey: r.rangeKeyBH,
659 1 : ValueIndex: r.valueBIH.h,
660 1 : Properties: r.propertiesBH,
661 1 : MetaIndex: r.metaIndexBH,
662 1 : Footer: r.footerBH,
663 1 : Format: r.tableFormat,
664 1 : }
665 1 :
666 1 : indexH, err := r.readIndex(context.Background(), nil, nil, nil)
667 1 : if err != nil {
668 0 : return nil, err
669 0 : }
670 1 : defer indexH.Release()
671 1 :
672 1 : var alloc bytealloc.A
673 1 :
674 1 : if r.Properties.IndexPartitions == 0 {
675 1 : l.Index = append(l.Index, r.indexBH)
676 1 : iter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
677 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
678 1 : dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
679 1 : if err != nil {
680 0 : return nil, errCorruptIndexEntry(err)
681 0 : }
682 1 : if len(dataBH.Props) > 0 {
683 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
684 1 : }
685 1 : l.Data = append(l.Data, dataBH)
686 : }
687 1 : } else {
688 1 : l.TopIndex = r.indexBH
689 1 : topIter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
690 1 : iter := &rowblk.Iter{}
691 1 : for kv := topIter.First(); kv != nil; kv = topIter.Next() {
692 1 : indexBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
693 1 : if err != nil {
694 0 : return nil, errCorruptIndexEntry(err)
695 0 : }
696 1 : l.Index = append(l.Index, indexBH.Handle)
697 1 :
698 1 : subIndex, err := r.readBlock(context.Background(), indexBH.Handle,
699 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
700 1 : if err != nil {
701 0 : return nil, err
702 0 : }
703 : // TODO(msbutler): figure out how to pass virtualState to layout call.
704 1 : if err := iter.Init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
705 0 : return nil, err
706 0 : }
707 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
708 1 : dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
709 1 : if len(dataBH.Props) > 0 {
710 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
711 1 : }
712 1 : if err != nil {
713 0 : return nil, errCorruptIndexEntry(err)
714 0 : }
715 1 : l.Data = append(l.Data, dataBH)
716 : }
717 1 : subIndex.Release()
718 1 : *iter = iter.ResetForReuse()
719 : }
720 : }
721 1 : if r.valueBIH.h.Length != 0 {
722 1 : vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
723 1 : if err != nil {
724 0 : return nil, err
725 0 : }
726 1 : defer vbiH.Release()
727 1 : vbiBlock := vbiH.Get()
728 1 : indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
729 1 : r.valueBIH.blockLengthByteLength)
730 1 : i := 0
731 1 : for len(vbiBlock) != 0 {
732 1 : if len(vbiBlock) < indexEntryLen {
733 0 : return nil, errors.Errorf(
734 0 : "remaining value index block %d does not contain a full entry of length %d",
735 0 : len(vbiBlock), indexEntryLen)
736 0 : }
737 1 : n := int(r.valueBIH.blockNumByteLength)
738 1 : bn := int(littleEndianGet(vbiBlock, n))
739 1 : if bn != i {
740 0 : return nil, errors.Errorf("unexpected block num %d, expected %d",
741 0 : bn, i)
742 0 : }
743 1 : i++
744 1 : vbiBlock = vbiBlock[n:]
745 1 : n = int(r.valueBIH.blockOffsetByteLength)
746 1 : blockOffset := littleEndianGet(vbiBlock, n)
747 1 : vbiBlock = vbiBlock[n:]
748 1 : n = int(r.valueBIH.blockLengthByteLength)
749 1 : blockLen := littleEndianGet(vbiBlock, n)
750 1 : vbiBlock = vbiBlock[n:]
751 1 : l.ValueBlock = append(l.ValueBlock, block.Handle{Offset: blockOffset, Length: blockLen})
752 : }
753 : }
754 :
755 1 : return l, nil
756 : }
757 :
758 : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
759 1 : func (r *Reader) ValidateBlockChecksums() error {
760 1 : // Pre-compute the BlockHandles for the underlying file.
761 1 : l, err := r.Layout()
762 1 : if err != nil {
763 0 : return err
764 0 : }
765 :
766 : // Construct the set of blocks to check. Note that the footer is not checked
767 : // as it is not a block with a checksum.
768 1 : blocks := make([]block.Handle, len(l.Data))
769 1 : for i := range l.Data {
770 1 : blocks[i] = l.Data[i].Handle
771 1 : }
772 1 : blocks = append(blocks, l.Index...)
773 1 : blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
774 1 :
775 1 : // Sorting by offset ensures we are performing a sequential scan of the
776 1 : // file.
777 1 : slices.SortFunc(blocks, func(a, b block.Handle) int {
778 1 : return cmp.Compare(a.Offset, b.Offset)
779 1 : })
780 :
781 : // Check all blocks sequentially. Make use of read-ahead, given we are
782 : // scanning the entire file from start to end.
783 1 : rh := r.readable.NewReadHandle(objstorage.NoReadBefore)
784 1 : defer rh.Close()
785 1 :
786 1 : for _, bh := range blocks {
787 1 : // Certain blocks may not be present, in which case we skip them.
788 1 : if bh.Length == 0 {
789 1 : continue
790 : }
791 :
792 : // Read the block, which validates the checksum.
793 1 : h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
794 1 : if err != nil {
795 0 : return err
796 0 : }
797 1 : h.Release()
798 : }
799 :
800 1 : return nil
801 : }
802 :
803 : // CommonProperties implemented the CommonReader interface.
804 1 : func (r *Reader) CommonProperties() *CommonProperties {
805 1 : return &r.Properties.CommonProperties
806 1 : }
807 :
808 : // EstimateDiskUsage returns the total size of data blocks overlapping the range
809 : // `[start, end]`. Even if a data block partially overlaps, or we cannot
810 : // determine overlap due to abbreviated index keys, the full data block size is
811 : // included in the estimation.
812 : //
813 : // This function does not account for any metablock space usage. Assumes there
814 : // is at least partial overlap, i.e., `[start, end]` falls neither completely
815 : // before nor completely after the file's range.
816 : //
817 : // Only blocks containing point keys are considered. Range deletion and range
818 : // key blocks are not considered.
819 : //
820 : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
821 : // data blocks overlapped and add that same fraction of the metadata blocks to the
822 : // estimate.
823 1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
824 1 : if r.err != nil {
825 0 : return 0, r.err
826 0 : }
827 :
828 1 : indexH, err := r.readIndex(context.Background(), nil, nil, nil)
829 1 : if err != nil {
830 0 : return 0, err
831 0 : }
832 1 : defer indexH.Release()
833 1 :
834 1 : // Iterators over the bottom-level index blocks containing start and end.
835 1 : // These may be different in case of partitioned index but will both point
836 1 : // to the same blockIter over the single index in the unpartitioned case.
837 1 : var startIdxIter, endIdxIter *rowblk.Iter
838 1 : if r.Properties.IndexPartitions == 0 {
839 1 : iter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
840 1 : if err != nil {
841 0 : return 0, err
842 0 : }
843 1 : startIdxIter = iter
844 1 : endIdxIter = iter
845 1 : } else {
846 1 : topIter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
847 1 : if err != nil {
848 0 : return 0, err
849 0 : }
850 :
851 1 : kv := topIter.SeekGE(start, base.SeekGEFlagsNone)
852 1 : if kv == nil {
853 1 : // The range falls completely after this file, or an error occurred.
854 1 : return 0, topIter.Error()
855 1 : }
856 1 : startIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
857 1 : if err != nil {
858 0 : return 0, errCorruptIndexEntry(err)
859 0 : }
860 1 : startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.Handle,
861 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
862 1 : if err != nil {
863 0 : return 0, err
864 0 : }
865 1 : defer startIdxBlock.Release()
866 1 : startIdxIter, err = rowblk.NewIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
867 1 : if err != nil {
868 0 : return 0, err
869 0 : }
870 :
871 1 : kv = topIter.SeekGE(end, base.SeekGEFlagsNone)
872 1 : if kv == nil {
873 1 : if err := topIter.Error(); err != nil {
874 0 : return 0, err
875 0 : }
876 1 : } else {
877 1 : endIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
878 1 : if err != nil {
879 0 : return 0, errCorruptIndexEntry(err)
880 0 : }
881 1 : endIdxBlock, err := r.readBlock(context.Background(),
882 1 : endIdxBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
883 1 : if err != nil {
884 0 : return 0, err
885 0 : }
886 1 : defer endIdxBlock.Release()
887 1 : endIdxIter, err = rowblk.NewIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
888 1 : if err != nil {
889 0 : return 0, err
890 0 : }
891 : }
892 : }
893 : // startIdxIter should not be nil at this point, while endIdxIter can be if the
894 : // range spans past the end of the file.
895 :
896 1 : kv := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
897 1 : if kv == nil {
898 1 : // The range falls completely after this file, or an error occurred.
899 1 : return 0, startIdxIter.Error()
900 1 : }
901 1 : startBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
902 1 : if err != nil {
903 0 : return 0, errCorruptIndexEntry(err)
904 0 : }
905 :
906 1 : includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
907 1 : // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
908 1 : // Linearly interpolate what is stored in value blocks.
909 1 : //
910 1 : // TODO(sumeer): if we need more accuracy, without loading any data blocks
911 1 : // (which contain the value handles, and which may also be insufficient if
912 1 : // the values are in separate files), we will need to accumulate the
913 1 : // logical size of the key-value pairs and store the cumulative value for
914 1 : // each data block in the index block entry. This increases the size of
915 1 : // the BlockHandle, so wait until this becomes necessary.
916 1 : return dataBlockSize +
917 1 : uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
918 1 : float64(r.Properties.ValueBlocksSize))
919 1 : }
920 1 : if endIdxIter == nil {
921 1 : // The range spans beyond this file. Include data blocks through the last.
922 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
923 1 : }
924 1 : kv = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
925 1 : if kv == nil {
926 1 : if err := endIdxIter.Error(); err != nil {
927 0 : return 0, err
928 0 : }
929 : // The range spans beyond this file. Include data blocks through the last.
930 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
931 : }
932 1 : endBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
933 1 : if err != nil {
934 0 : return 0, errCorruptIndexEntry(err)
935 0 : }
936 1 : return includeInterpolatedValueBlocksSize(
937 1 : endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
938 : }
939 :
940 : // TableFormat returns the format version for the table.
941 1 : func (r *Reader) TableFormat() (TableFormat, error) {
942 1 : if r.err != nil {
943 0 : return TableFormatUnspecified, r.err
944 0 : }
945 1 : return r.tableFormat, nil
946 : }
947 :
948 : // NewReader returns a new table reader for the file. Closing the reader will
949 : // close the file.
950 : //
951 : // The context is used for tracing any operations performed by NewReader; it is
952 : // NOT stored for future use.
953 1 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
954 1 : if f == nil {
955 0 : return nil, errors.New("pebble/table: nil file")
956 0 : }
957 1 : o = o.ensureDefaults()
958 1 : r := &Reader{
959 1 : readable: f,
960 1 : cacheOpts: o.internal.CacheOpts,
961 1 : loadBlockSema: o.LoadBlockSema,
962 1 : deniedUserProperties: o.DeniedUserProperties,
963 1 : filterMetricsTracker: o.FilterMetricsTracker,
964 1 : logger: o.LoggerAndTracer,
965 1 : }
966 1 : if r.cacheOpts.Cache == nil {
967 1 : r.cacheOpts.Cache = cache.New(0)
968 1 : } else {
969 1 : r.cacheOpts.Cache.Ref()
970 1 : }
971 1 : if r.cacheOpts.CacheID == 0 {
972 1 : r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
973 1 : }
974 :
975 1 : var preallocRH objstorageprovider.PreallocatedReadHandle
976 1 : rh := objstorageprovider.UsePreallocatedReadHandle(
977 1 : r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
978 1 : defer rh.Close()
979 1 :
980 1 : footer, err := readFooter(ctx, f, rh, r.logger)
981 1 : if err != nil {
982 0 : r.err = err
983 0 : return nil, r.Close()
984 0 : }
985 1 : r.checksumType = footer.checksum
986 1 : r.tableFormat = footer.format
987 1 : // Read the metaindex and properties blocks.
988 1 : if err := r.readMetaindex(ctx, footer.metaindexBH, rh, o.Filters); err != nil {
989 0 : r.err = err
990 0 : return nil, r.Close()
991 0 : }
992 1 : r.indexBH = footer.indexBH
993 1 : r.metaIndexBH = footer.metaindexBH
994 1 : r.footerBH = footer.footerBH
995 1 :
996 1 : if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
997 1 : r.Compare = o.Comparer.Compare
998 1 : r.Equal = o.Comparer.Equal
999 1 : r.FormatKey = o.Comparer.FormatKey
1000 1 : r.Split = o.Comparer.Split
1001 1 : } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
1002 0 : r.Compare = comparer.Compare
1003 0 : r.Equal = comparer.Equal
1004 0 : r.FormatKey = comparer.FormatKey
1005 0 : r.Split = comparer.Split
1006 0 : } else {
1007 0 : r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
1008 0 : errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
1009 0 : }
1010 :
1011 1 : if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
1012 1 : if o.Merger != nil && o.Merger.Name == mergerName {
1013 1 : // opts.Merger matches.
1014 1 : } else if _, ok := o.Mergers[mergerName]; ok {
1015 0 : // Known merger.
1016 0 : } else {
1017 0 : r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
1018 0 : errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
1019 0 : }
1020 : }
1021 :
1022 1 : if r.err != nil {
1023 0 : return nil, r.Close()
1024 0 : }
1025 :
1026 1 : return r, nil
1027 : }
1028 :
1029 : // ReadableFile describes the smallest subset of vfs.File that is required for
1030 : // reading SSTs.
1031 : type ReadableFile interface {
1032 : io.ReaderAt
1033 : io.Closer
1034 : Stat() (os.FileInfo, error)
1035 : }
1036 :
1037 : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
1038 : // implementation (which does not support read-ahead)
1039 1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
1040 1 : info, err := r.Stat()
1041 1 : if err != nil {
1042 0 : return nil, err
1043 0 : }
1044 1 : res := &simpleReadable{
1045 1 : f: r,
1046 1 : size: info.Size(),
1047 1 : }
1048 1 : res.rh = objstorage.MakeNoopReadHandle(res)
1049 1 : return res, nil
1050 : }
1051 :
1052 : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
1053 : type simpleReadable struct {
1054 : f ReadableFile
1055 : size int64
1056 : rh objstorage.NoopReadHandle
1057 : }
1058 :
1059 : var _ objstorage.Readable = (*simpleReadable)(nil)
1060 :
1061 : // ReadAt is part of the objstorage.Readable interface.
1062 1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
1063 1 : n, err := s.f.ReadAt(p, off)
1064 1 : if invariants.Enabled && err == nil && n != len(p) {
1065 0 : panic("short read")
1066 : }
1067 1 : return err
1068 : }
1069 :
1070 : // Close is part of the objstorage.Readable interface.
1071 1 : func (s *simpleReadable) Close() error {
1072 1 : return s.f.Close()
1073 1 : }
1074 :
1075 : // Size is part of the objstorage.Readable interface.
1076 1 : func (s *simpleReadable) Size() int64 {
1077 1 : return s.size
1078 1 : }
1079 :
1080 : // NewReaddHandle is part of the objstorage.Readable interface.
1081 : func (s *simpleReadable) NewReadHandle(
1082 : readBeforeSize objstorage.ReadBeforeSize,
1083 1 : ) objstorage.ReadHandle {
1084 1 : return &s.rh
1085 1 : }
1086 :
1087 0 : func errCorruptIndexEntry(err error) error {
1088 0 : err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
1089 0 : if invariants.Enabled {
1090 0 : panic(err)
1091 : }
1092 0 : return err
1093 : }
1094 :
1095 : type deterministicStopwatchForTesting struct {
1096 : startTime time.Time
1097 : }
1098 :
1099 1 : func makeStopwatch() deterministicStopwatchForTesting {
1100 1 : return deterministicStopwatchForTesting{startTime: time.Now()}
1101 1 : }
1102 :
1103 1 : func (w deterministicStopwatchForTesting) stop() time.Duration {
1104 1 : dur := time.Since(w.startTime)
1105 1 : if deterministicReadBlockDurationForTesting {
1106 0 : dur = slowReadTracingThreshold
1107 0 : }
1108 1 : return dur
1109 : }
|