Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "io"
13 : "os"
14 : "path/filepath"
15 : "runtime"
16 : "slices"
17 : "time"
18 :
19 : "github.com/cespare/xxhash/v2"
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/fifo"
22 : "github.com/cockroachdb/pebble/internal/base"
23 : "github.com/cockroachdb/pebble/internal/bytealloc"
24 : "github.com/cockroachdb/pebble/internal/cache"
25 : "github.com/cockroachdb/pebble/internal/crc"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/keyspan"
28 : "github.com/cockroachdb/pebble/internal/sstableinternal"
29 : "github.com/cockroachdb/pebble/objstorage"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
31 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
32 : "github.com/cockroachdb/pebble/sstable/block"
33 : "github.com/cockroachdb/pebble/sstable/rowblk"
34 : )
35 :
36 : var errReaderClosed = errors.New("pebble/table: reader is closed")
37 :
38 : // decodeBlockHandle returns the block handle encoded at the start of src, as
39 : // well as the number of bytes it occupies. It returns zero if given invalid
40 : // input. A block handle for a data block or a first/lower level index block
41 : // should not be decoded using decodeBlockHandle since the caller may validate
42 : // that the number of bytes decoded is equal to the length of src, which will
43 : // be false if the properties are not decoded. In those cases the caller
44 : // should use decodeBlockHandleWithProperties.
45 1 : func decodeBlockHandle(src []byte) (block.Handle, int) {
46 1 : offset, n := binary.Uvarint(src)
47 1 : length, m := binary.Uvarint(src[n:])
48 1 : if n == 0 || m == 0 {
49 0 : return block.Handle{}, 0
50 0 : }
51 1 : return block.Handle{Offset: offset, Length: length}, n + m
52 : }
53 :
54 : // decodeBlockHandleWithProperties returns the block handle and properties
55 : // encoded in src. src needs to be exactly the length that was encoded. This
56 : // method must be used for data block and first/lower level index blocks. The
57 : // properties in the block handle point to the bytes in src.
58 1 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
59 1 : bh, n := decodeBlockHandle(src)
60 1 : if n == 0 {
61 0 : return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
62 0 : }
63 1 : return BlockHandleWithProperties{
64 1 : Handle: bh,
65 1 : Props: src[n:],
66 1 : }, nil
67 : }
68 :
69 1 : func encodeBlockHandle(dst []byte, b block.Handle) int {
70 1 : n := binary.PutUvarint(dst, b.Offset)
71 1 : m := binary.PutUvarint(dst[n:], b.Length)
72 1 : return n + m
73 1 : }
74 :
75 1 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
76 1 : n := encodeBlockHandle(dst, b.Handle)
77 1 : dst = append(dst[:n], b.Props...)
78 1 : return dst
79 1 : }
80 :
81 : type loadBlockResult int8
82 :
83 : const (
84 : loadBlockOK loadBlockResult = iota
85 : // Could be due to error or because no block left to load.
86 : loadBlockFailed
87 : loadBlockIrrelevant
88 : )
89 :
90 : type blockTransform func([]byte) ([]byte, error)
91 :
92 : // Reader is a table reader.
93 : type Reader struct {
94 : readable objstorage.Readable
95 :
96 : // The following fields are copied from the ReadOptions.
97 : cacheOpts sstableinternal.CacheOptions
98 : loadBlockSema *fifo.Semaphore
99 : deniedUserProperties map[string]struct{}
100 : filterMetricsTracker *FilterMetricsTracker
101 : logger base.LoggerAndTracer
102 :
103 : Comparer *base.Comparer
104 : Compare Compare
105 : SuffixCmp CompareSuffixes
106 : Equal Equal
107 : Split Split
108 :
109 : tableFilter *tableFilterReader
110 :
111 : err error
112 :
113 : indexBH block.Handle
114 : filterBH block.Handle
115 : rangeDelBH block.Handle
116 : rangeKeyBH block.Handle
117 : valueBIH valueBlocksIndexHandle
118 : propertiesBH block.Handle
119 : metaIndexBH block.Handle
120 : footerBH block.Handle
121 :
122 : Properties Properties
123 : tableFormat TableFormat
124 : checksumType block.ChecksumType
125 :
126 : // metaBufferPool is a buffer pool used exclusively when opening a table and
127 : // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
128 : // the BufferPool.pool slice as a part of the Reader allocation. It's
129 : // capacity 3 to accommodate the meta block (1), and both the compressed
130 : // properties block (1) and decompressed properties block (1)
131 : // simultaneously.
132 : metaBufferPool block.BufferPool
133 : metaBufferPoolAlloc [3]block.AllocedBuffer
134 : }
135 :
136 : var _ CommonReader = (*Reader)(nil)
137 :
138 : // Close the reader and the underlying objstorage.Readable.
139 1 : func (r *Reader) Close() error {
140 1 : r.cacheOpts.Cache.Unref()
141 1 :
142 1 : if r.readable != nil {
143 1 : r.err = firstError(r.err, r.readable.Close())
144 1 : r.readable = nil
145 1 : }
146 :
147 1 : if r.err != nil {
148 0 : return r.err
149 0 : }
150 : // Make any future calls to Get, NewIter or Close return an error.
151 1 : r.err = errReaderClosed
152 1 : return nil
153 : }
154 :
155 : // NewPointIter returns an iterator for the point keys in the table.
156 : //
157 : // If transform.HideObsoletePoints is set, the callee assumes that filterer
158 : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
159 : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
160 : func (r *Reader) NewPointIter(
161 : ctx context.Context,
162 : transforms IterTransforms,
163 : lower, upper []byte,
164 : filterer *BlockPropertiesFilterer,
165 : filterBlockSizeLimit FilterBlockSizeLimit,
166 : stats *base.InternalIteratorStats,
167 : categoryAndQoS CategoryAndQoS,
168 : statsCollector *CategoryStatsCollector,
169 : rp ReaderProvider,
170 1 : ) (Iterator, error) {
171 1 : return r.newPointIter(
172 1 : ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
173 1 : stats, categoryAndQoS, statsCollector, rp, nil)
174 1 : }
175 :
176 : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
177 : // before the call to NewPointIter, to get the value of hideObsoletePoints and
178 : // potentially add a block property filter.
179 : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
180 : snapshotForHideObsoletePoints base.SeqNum,
181 : fileLargestSeqNum base.SeqNum,
182 : pointKeyFilters []BlockPropertyFilter,
183 1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
184 1 : hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
185 1 : snapshotForHideObsoletePoints > fileLargestSeqNum
186 1 : if hideObsoletePoints {
187 1 : pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
188 1 : }
189 1 : return hideObsoletePoints, pointKeyFilters
190 : }
191 :
192 : func (r *Reader) newPointIter(
193 : ctx context.Context,
194 : transforms IterTransforms,
195 : lower, upper []byte,
196 : filterer *BlockPropertiesFilterer,
197 : filterBlockSizeLimit FilterBlockSizeLimit,
198 : stats *base.InternalIteratorStats,
199 : categoryAndQoS CategoryAndQoS,
200 : statsCollector *CategoryStatsCollector,
201 : rp ReaderProvider,
202 : vState *virtualState,
203 1 : ) (Iterator, error) {
204 1 : // NB: pebble.tableCache wraps the returned iterator with one which performs
205 1 : // reference counting on the Reader, preventing the Reader from being closed
206 1 : // until the final iterator closes.
207 1 : var res Iterator
208 1 : var err error
209 1 : if r.Properties.IndexType == twoLevelIndex {
210 1 : res, err = newRowBlockTwoLevelIterator(
211 1 : ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
212 1 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
213 1 : } else {
214 1 : res, err = newRowBlockSingleLevelIterator(
215 1 : ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
216 1 : stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
217 1 : }
218 1 : if err != nil {
219 0 : // Note: we don't want to return res here - it will be a nil
220 0 : // single/twoLevelIterator, not a nil Iterator.
221 0 : return nil, err
222 0 : }
223 1 : return res, nil
224 : }
225 :
226 : // NewIter returns an iterator for the point keys in the table. It is a
227 : // simplified version of NewPointIter and should only be used for tests and
228 : // tooling.
229 : //
230 : // NewIter must only be used when the Reader is guaranteed to outlive any
231 : // LazyValues returned from the iter.
232 1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
233 1 : // TODO(radu): we should probably not use bloom filters in this case, as there
234 1 : // likely isn't a cache set up.
235 1 : return r.NewPointIter(
236 1 : context.TODO(), transforms, lower, upper, nil, AlwaysUseFilterBlock,
237 1 : nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, MakeTrivialReaderProvider(r))
238 1 : }
239 :
240 : // NewCompactionIter returns an iterator similar to NewIter but it also increments
241 : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
242 : // after itself and returns a nil iterator.
243 : func (r *Reader) NewCompactionIter(
244 : transforms IterTransforms,
245 : categoryAndQoS CategoryAndQoS,
246 : statsCollector *CategoryStatsCollector,
247 : rp ReaderProvider,
248 : bufferPool *block.BufferPool,
249 1 : ) (Iterator, error) {
250 1 : return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool)
251 1 : }
252 :
253 : func (r *Reader) newCompactionIter(
254 : transforms IterTransforms,
255 : categoryAndQoS CategoryAndQoS,
256 : statsCollector *CategoryStatsCollector,
257 : rp ReaderProvider,
258 : vState *virtualState,
259 : bufferPool *block.BufferPool,
260 1 : ) (Iterator, error) {
261 1 : if vState != nil && vState.isSharedIngested {
262 1 : transforms.HideObsoletePoints = true
263 1 : }
264 1 : if r.Properties.IndexType == twoLevelIndex {
265 1 : i, err := newRowBlockTwoLevelIterator(
266 1 : context.Background(),
267 1 : r, vState, transforms, nil /* lower */, nil /* upper */, nil,
268 1 : NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
269 1 : )
270 1 : if err != nil {
271 0 : return nil, err
272 0 : }
273 1 : i.SetupForCompaction()
274 1 : return i, nil
275 : }
276 1 : i, err := newRowBlockSingleLevelIterator(
277 1 : context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
278 1 : nil, NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
279 1 : )
280 1 : if err != nil {
281 0 : return nil, err
282 0 : }
283 1 : i.SetupForCompaction()
284 1 : return i, nil
285 : }
286 :
287 : // NewRawRangeDelIter returns an internal iterator for the contents of the
288 : // range-del block for the table. Returns nil if the table does not contain
289 : // any range deletions.
290 : func (r *Reader) NewRawRangeDelIter(
291 : ctx context.Context, transforms FragmentIterTransforms,
292 1 : ) (keyspan.FragmentIterator, error) {
293 1 : if r.rangeDelBH.Length == 0 {
294 1 : return nil, nil
295 1 : }
296 1 : h, err := r.readRangeDel(ctx, nil /* stats */, nil /* iterStats */)
297 1 : if err != nil {
298 0 : return nil, err
299 0 : }
300 1 : transforms.ElideSameSeqNum = true
301 1 : i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
302 1 : if err != nil {
303 0 : return nil, err
304 0 : }
305 1 : return keyspan.MaybeAssert(i, r.Compare), nil
306 : }
307 :
308 : // NewRawRangeKeyIter returns an internal iterator for the contents of the
309 : // range-key block for the table. Returns nil if the table does not contain any
310 : // range keys.
311 : func (r *Reader) NewRawRangeKeyIter(
312 : ctx context.Context, transforms FragmentIterTransforms,
313 1 : ) (keyspan.FragmentIterator, error) {
314 1 : if r.rangeKeyBH.Length == 0 {
315 1 : return nil, nil
316 1 : }
317 1 : h, err := r.readRangeKey(ctx, nil /* stats */, nil /* iterStats */)
318 1 : if err != nil {
319 0 : return nil, err
320 0 : }
321 1 : i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
322 1 : if err != nil {
323 0 : return nil, err
324 0 : }
325 1 : return keyspan.MaybeAssert(i, r.Compare), nil
326 : }
327 :
328 : func (r *Reader) readIndex(
329 : ctx context.Context,
330 : readHandle objstorage.ReadHandle,
331 : stats *base.InternalIteratorStats,
332 : iterStats *iterStatsAccumulator,
333 1 : ) (block.BufferHandle, error) {
334 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
335 1 : return r.readBlock(ctx, r.indexBH, nil, readHandle, stats, iterStats, nil /* buffer pool */)
336 1 : }
337 :
338 : func (r *Reader) readFilter(
339 : ctx context.Context,
340 : readHandle objstorage.ReadHandle,
341 : stats *base.InternalIteratorStats,
342 : iterStats *iterStatsAccumulator,
343 1 : ) (block.BufferHandle, error) {
344 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
345 1 : return r.readBlock(ctx, r.filterBH, nil /* transform */, readHandle, stats, iterStats, nil /* buffer pool */)
346 1 : }
347 :
348 : func (r *Reader) readRangeDel(
349 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
350 1 : ) (block.BufferHandle, error) {
351 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
352 1 : return r.readBlock(ctx, r.rangeDelBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
353 1 : }
354 :
355 : func (r *Reader) readRangeKey(
356 : ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
357 1 : ) (block.BufferHandle, error) {
358 1 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
359 1 : return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
360 1 : }
361 :
362 : func checkChecksum(
363 : checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
364 1 : ) error {
365 1 : expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
366 1 : var computedChecksum uint32
367 1 : switch checksumType {
368 1 : case block.ChecksumTypeCRC32c:
369 1 : computedChecksum = crc.New(b[:bh.Length+1]).Value()
370 0 : case block.ChecksumTypeXXHash64:
371 0 : computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
372 0 : default:
373 0 : return errors.Errorf("unsupported checksum type: %d", checksumType)
374 : }
375 :
376 1 : if expectedChecksum != computedChecksum {
377 0 : return base.CorruptionErrorf(
378 0 : "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
379 0 : fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
380 0 : }
381 1 : return nil
382 : }
383 :
384 : // DeterministicReadBlockDurationForTesting is for tests that want a
385 : // deterministic value of the time to read a block (that is not in the cache).
386 : // The return value is a function that must be called before the test exits.
387 0 : func DeterministicReadBlockDurationForTesting() func() {
388 0 : drbdForTesting := deterministicReadBlockDurationForTesting
389 0 : deterministicReadBlockDurationForTesting = true
390 0 : return func() {
391 0 : deterministicReadBlockDurationForTesting = drbdForTesting
392 0 : }
393 : }
394 :
395 : var deterministicReadBlockDurationForTesting = false
396 :
397 : func (r *Reader) readBlock(
398 : ctx context.Context,
399 : bh block.Handle,
400 : transform blockTransform,
401 : readHandle objstorage.ReadHandle,
402 : stats *base.InternalIteratorStats,
403 : iterStats *iterStatsAccumulator,
404 : bufferPool *block.BufferPool,
405 1 : ) (handle block.BufferHandle, _ error) {
406 1 : if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Get() != nil {
407 1 : // Cache hit.
408 1 : if readHandle != nil {
409 1 : readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
410 1 : }
411 1 : if stats != nil {
412 1 : stats.BlockBytes += bh.Length
413 1 : stats.BlockBytesInCache += bh.Length
414 1 : }
415 1 : if iterStats != nil {
416 1 : iterStats.reportStats(bh.Length, bh.Length, 0)
417 1 : }
418 : // This block is already in the cache; return a handle to existing vlaue
419 : // in the cache.
420 1 : return block.CacheBufferHandle(h), nil
421 : }
422 :
423 : // Cache miss.
424 :
425 1 : if sema := r.loadBlockSema; sema != nil {
426 0 : if err := sema.Acquire(ctx, 1); err != nil {
427 0 : // An error here can only come from the context.
428 0 : return block.BufferHandle{}, err
429 0 : }
430 0 : defer sema.Release(1)
431 : }
432 :
433 1 : compressed := block.Alloc(int(bh.Length+block.TrailerLen), bufferPool)
434 1 : readStopwatch := makeStopwatch()
435 1 : var err error
436 1 : if readHandle != nil {
437 1 : err = readHandle.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
438 1 : } else {
439 1 : err = r.readable.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
440 1 : }
441 1 : readDuration := readStopwatch.stop()
442 1 : // Call IsTracingEnabled to avoid the allocations of boxing integers into an
443 1 : // interface{}, unless necessary.
444 1 : if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
445 0 : _, file1, line1, _ := runtime.Caller(1)
446 0 : _, file2, line2, _ := runtime.Caller(2)
447 0 : r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
448 0 : int(bh.Length+block.TrailerLen), readDuration.String(),
449 0 : r.cacheOpts.FileNum,
450 0 : filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
451 0 : filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
452 0 : }
453 1 : if stats != nil {
454 1 : stats.BlockBytes += bh.Length
455 1 : stats.BlockReadDuration += readDuration
456 1 : }
457 1 : if err != nil {
458 0 : compressed.Release()
459 0 : return block.BufferHandle{}, err
460 0 : }
461 1 : if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.cacheOpts.FileNum); err != nil {
462 0 : compressed.Release()
463 0 : return block.BufferHandle{}, err
464 0 : }
465 :
466 1 : typ := block.CompressionIndicator(compressed.Get()[bh.Length])
467 1 : compressed.Truncate(int(bh.Length))
468 1 :
469 1 : var decompressed block.Value
470 1 : if typ == block.NoCompressionIndicator {
471 1 : decompressed = compressed
472 1 : } else {
473 1 : // Decode the length of the decompressed value.
474 1 : decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.Get())
475 1 : if err != nil {
476 0 : compressed.Release()
477 0 : return block.BufferHandle{}, err
478 0 : }
479 :
480 1 : decompressed = block.Alloc(decodedLen, bufferPool)
481 1 : if err := block.DecompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
482 0 : compressed.Release()
483 0 : return block.BufferHandle{}, err
484 0 : }
485 1 : compressed.Release()
486 : }
487 :
488 1 : if transform != nil {
489 0 : // Transforming blocks is very rare, so the extra copy of the
490 0 : // transformed data is not problematic.
491 0 : tmpTransformed, err := transform(decompressed.Get())
492 0 : if err != nil {
493 0 : decompressed.Release()
494 0 : return block.BufferHandle{}, err
495 0 : }
496 :
497 0 : transformed := block.Alloc(len(tmpTransformed), bufferPool)
498 0 : copy(transformed.Get(), tmpTransformed)
499 0 : decompressed.Release()
500 0 : decompressed = transformed
501 : }
502 :
503 1 : if iterStats != nil {
504 1 : iterStats.reportStats(bh.Length, 0, readDuration)
505 1 : }
506 1 : h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
507 1 : return h, nil
508 : }
509 :
510 : func (r *Reader) readMetaindex(
511 : ctx context.Context,
512 : metaindexBH block.Handle,
513 : readHandle objstorage.ReadHandle,
514 : filters map[string]FilterPolicy,
515 1 : ) error {
516 1 : // We use a BufferPool when reading metaindex blocks in order to avoid
517 1 : // populating the block cache with these blocks. In heavy-write workloads,
518 1 : // especially with high compaction concurrency, new tables may be created
519 1 : // frequently. Populating the block cache with these metaindex blocks adds
520 1 : // additional contention on the block cache mutexes (see #1997).
521 1 : // Additionally, these blocks are exceedingly unlikely to be read again
522 1 : // while they're still in the block cache except in misconfigurations with
523 1 : // excessive sstables counts or a table cache that's far too small.
524 1 : r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
525 1 : // When we're finished, release the buffers we've allocated back to memory
526 1 : // allocator. We don't expect to use metaBufferPool again.
527 1 : defer r.metaBufferPool.Release()
528 1 :
529 1 : b, err := r.readBlock(
530 1 : ctx, metaindexBH, nil /* transform */, readHandle, nil, /* stats */
531 1 : nil /* iterStats */, &r.metaBufferPool)
532 1 : if err != nil {
533 0 : return err
534 0 : }
535 1 : data := b.Get()
536 1 : defer b.Release()
537 1 :
538 1 : if uint64(len(data)) != metaindexBH.Length {
539 0 : return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
540 0 : errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
541 0 : }
542 :
543 1 : i, err := rowblk.NewRawIter(bytes.Compare, data)
544 1 : if err != nil {
545 0 : return err
546 0 : }
547 :
548 1 : meta := map[string]block.Handle{}
549 1 : for valid := i.First(); valid; valid = i.Next() {
550 1 : value := i.Value()
551 1 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
552 1 : vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
553 1 : if err != nil {
554 0 : return err
555 0 : }
556 1 : if n == 0 || n != len(value) {
557 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
558 0 : }
559 1 : r.valueBIH = vbih
560 1 : } else {
561 1 : bh, n := decodeBlockHandle(value)
562 1 : if n == 0 || n != len(value) {
563 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
564 0 : }
565 1 : meta[string(i.Key().UserKey)] = bh
566 : }
567 : }
568 1 : if err := i.Close(); err != nil {
569 0 : return err
570 0 : }
571 :
572 1 : if bh, ok := meta[metaPropertiesName]; ok {
573 1 : b, err = r.readBlock(
574 1 : ctx, bh, nil /* transform */, readHandle, nil, /* stats */
575 1 : nil /* iterStats */, nil /* buffer pool */)
576 1 : if err != nil {
577 0 : return err
578 0 : }
579 1 : r.propertiesBH = bh
580 1 : err := r.Properties.load(b.Get(), r.deniedUserProperties)
581 1 : b.Release()
582 1 : if err != nil {
583 0 : return err
584 0 : }
585 : }
586 :
587 1 : if bh, ok := meta[metaRangeDelV2Name]; ok {
588 1 : r.rangeDelBH = bh
589 1 : } else if _, ok := meta[metaRangeDelV1Name]; ok {
590 0 : // This version of Pebble requires a format major version at least as
591 0 : // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
592 0 : // this format major verison, we have a guarantee that we've compacted
593 0 : // away all RocksDB sstables. It should not be possible to encounter an
594 0 : // sstable with a v1 range deletion block but not a v2 range deletion
595 0 : // block.
596 0 : err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
597 0 : return errors.Mark(err, base.ErrCorruption)
598 0 : }
599 :
600 1 : if bh, ok := meta[metaRangeKeyName]; ok {
601 1 : r.rangeKeyBH = bh
602 1 : }
603 :
604 1 : for name, fp := range filters {
605 1 : types := []struct {
606 1 : ftype FilterType
607 1 : prefix string
608 1 : }{
609 1 : {TableFilter, "fullfilter."},
610 1 : }
611 1 : var done bool
612 1 : for _, t := range types {
613 1 : if bh, ok := meta[t.prefix+name]; ok {
614 1 : r.filterBH = bh
615 1 :
616 1 : switch t.ftype {
617 1 : case TableFilter:
618 1 : r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
619 0 : default:
620 0 : return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
621 : }
622 :
623 1 : done = true
624 1 : break
625 : }
626 : }
627 1 : if done {
628 1 : break
629 : }
630 : }
631 1 : return nil
632 : }
633 :
634 : // Layout returns the layout (block organization) for an sstable.
635 1 : func (r *Reader) Layout() (*Layout, error) {
636 1 : if r.err != nil {
637 0 : return nil, r.err
638 0 : }
639 :
640 1 : l := &Layout{
641 1 : Data: make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
642 1 : Filter: r.filterBH,
643 1 : RangeDel: r.rangeDelBH,
644 1 : RangeKey: r.rangeKeyBH,
645 1 : ValueIndex: r.valueBIH.h,
646 1 : Properties: r.propertiesBH,
647 1 : MetaIndex: r.metaIndexBH,
648 1 : Footer: r.footerBH,
649 1 : Format: r.tableFormat,
650 1 : }
651 1 :
652 1 : indexH, err := r.readIndex(context.Background(), nil, nil, nil)
653 1 : if err != nil {
654 0 : return nil, err
655 0 : }
656 1 : defer indexH.Release()
657 1 :
658 1 : var alloc bytealloc.A
659 1 :
660 1 : if r.Properties.IndexPartitions == 0 {
661 1 : l.Index = append(l.Index, r.indexBH)
662 1 : iter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
663 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
664 1 : dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
665 1 : if err != nil {
666 0 : return nil, errCorruptIndexEntry(err)
667 0 : }
668 1 : if len(dataBH.Props) > 0 {
669 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
670 1 : }
671 1 : l.Data = append(l.Data, dataBH)
672 : }
673 1 : } else {
674 1 : l.TopIndex = r.indexBH
675 1 : topIter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
676 1 : iter := &rowblk.Iter{}
677 1 : for kv := topIter.First(); kv != nil; kv = topIter.Next() {
678 1 : indexBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
679 1 : if err != nil {
680 0 : return nil, errCorruptIndexEntry(err)
681 0 : }
682 1 : l.Index = append(l.Index, indexBH.Handle)
683 1 :
684 1 : subIndex, err := r.readBlock(context.Background(), indexBH.Handle,
685 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
686 1 : if err != nil {
687 0 : return nil, err
688 0 : }
689 : // TODO(msbutler): figure out how to pass virtualState to layout call.
690 1 : if err := iter.Init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
691 0 : return nil, err
692 0 : }
693 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
694 1 : dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
695 1 : if len(dataBH.Props) > 0 {
696 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
697 1 : }
698 1 : if err != nil {
699 0 : return nil, errCorruptIndexEntry(err)
700 0 : }
701 1 : l.Data = append(l.Data, dataBH)
702 : }
703 1 : subIndex.Release()
704 1 : *iter = iter.ResetForReuse()
705 : }
706 : }
707 1 : if r.valueBIH.h.Length != 0 {
708 1 : vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
709 1 : if err != nil {
710 0 : return nil, err
711 0 : }
712 1 : defer vbiH.Release()
713 1 : vbiBlock := vbiH.Get()
714 1 : indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
715 1 : r.valueBIH.blockLengthByteLength)
716 1 : i := 0
717 1 : for len(vbiBlock) != 0 {
718 1 : if len(vbiBlock) < indexEntryLen {
719 0 : return nil, errors.Errorf(
720 0 : "remaining value index block %d does not contain a full entry of length %d",
721 0 : len(vbiBlock), indexEntryLen)
722 0 : }
723 1 : n := int(r.valueBIH.blockNumByteLength)
724 1 : bn := int(littleEndianGet(vbiBlock, n))
725 1 : if bn != i {
726 0 : return nil, errors.Errorf("unexpected block num %d, expected %d",
727 0 : bn, i)
728 0 : }
729 1 : i++
730 1 : vbiBlock = vbiBlock[n:]
731 1 : n = int(r.valueBIH.blockOffsetByteLength)
732 1 : blockOffset := littleEndianGet(vbiBlock, n)
733 1 : vbiBlock = vbiBlock[n:]
734 1 : n = int(r.valueBIH.blockLengthByteLength)
735 1 : blockLen := littleEndianGet(vbiBlock, n)
736 1 : vbiBlock = vbiBlock[n:]
737 1 : l.ValueBlock = append(l.ValueBlock, block.Handle{Offset: blockOffset, Length: blockLen})
738 : }
739 : }
740 :
741 1 : return l, nil
742 : }
743 :
744 : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
745 1 : func (r *Reader) ValidateBlockChecksums() error {
746 1 : // Pre-compute the BlockHandles for the underlying file.
747 1 : l, err := r.Layout()
748 1 : if err != nil {
749 0 : return err
750 0 : }
751 :
752 : // Construct the set of blocks to check. Note that the footer is not checked
753 : // as it is not a block with a checksum.
754 1 : blocks := make([]block.Handle, len(l.Data))
755 1 : for i := range l.Data {
756 1 : blocks[i] = l.Data[i].Handle
757 1 : }
758 1 : blocks = append(blocks, l.Index...)
759 1 : blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
760 1 :
761 1 : // Sorting by offset ensures we are performing a sequential scan of the
762 1 : // file.
763 1 : slices.SortFunc(blocks, func(a, b block.Handle) int {
764 1 : return cmp.Compare(a.Offset, b.Offset)
765 1 : })
766 :
767 : // Check all blocks sequentially. Make use of read-ahead, given we are
768 : // scanning the entire file from start to end.
769 1 : rh := r.readable.NewReadHandle(objstorage.NoReadBefore)
770 1 : defer rh.Close()
771 1 :
772 1 : for _, bh := range blocks {
773 1 : // Certain blocks may not be present, in which case we skip them.
774 1 : if bh.Length == 0 {
775 1 : continue
776 : }
777 :
778 : // Read the block, which validates the checksum.
779 1 : h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
780 1 : if err != nil {
781 0 : return err
782 0 : }
783 1 : h.Release()
784 : }
785 :
786 1 : return nil
787 : }
788 :
789 : // CommonProperties implemented the CommonReader interface.
790 1 : func (r *Reader) CommonProperties() *CommonProperties {
791 1 : return &r.Properties.CommonProperties
792 1 : }
793 :
794 : // EstimateDiskUsage returns the total size of data blocks overlapping the range
795 : // `[start, end]`. Even if a data block partially overlaps, or we cannot
796 : // determine overlap due to abbreviated index keys, the full data block size is
797 : // included in the estimation.
798 : //
799 : // This function does not account for any metablock space usage. Assumes there
800 : // is at least partial overlap, i.e., `[start, end]` falls neither completely
801 : // before nor completely after the file's range.
802 : //
803 : // Only blocks containing point keys are considered. Range deletion and range
804 : // key blocks are not considered.
805 : //
806 : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
807 : // data blocks overlapped and add that same fraction of the metadata blocks to the
808 : // estimate.
809 1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
810 1 : if r.err != nil {
811 0 : return 0, r.err
812 0 : }
813 :
814 1 : indexH, err := r.readIndex(context.Background(), nil, nil, nil)
815 1 : if err != nil {
816 0 : return 0, err
817 0 : }
818 1 : defer indexH.Release()
819 1 :
820 1 : // Iterators over the bottom-level index blocks containing start and end.
821 1 : // These may be different in case of partitioned index but will both point
822 1 : // to the same blockIter over the single index in the unpartitioned case.
823 1 : var startIdxIter, endIdxIter *rowblk.Iter
824 1 : if r.Properties.IndexPartitions == 0 {
825 1 : iter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
826 1 : if err != nil {
827 0 : return 0, err
828 0 : }
829 1 : startIdxIter = iter
830 1 : endIdxIter = iter
831 1 : } else {
832 1 : topIter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
833 1 : if err != nil {
834 0 : return 0, err
835 0 : }
836 :
837 1 : kv := topIter.SeekGE(start, base.SeekGEFlagsNone)
838 1 : if kv == nil {
839 1 : // The range falls completely after this file, or an error occurred.
840 1 : return 0, topIter.Error()
841 1 : }
842 1 : startIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
843 1 : if err != nil {
844 0 : return 0, errCorruptIndexEntry(err)
845 0 : }
846 1 : startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.Handle,
847 1 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
848 1 : if err != nil {
849 0 : return 0, err
850 0 : }
851 1 : defer startIdxBlock.Release()
852 1 : startIdxIter, err = rowblk.NewIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
853 1 : if err != nil {
854 0 : return 0, err
855 0 : }
856 :
857 1 : kv = topIter.SeekGE(end, base.SeekGEFlagsNone)
858 1 : if kv == nil {
859 1 : if err := topIter.Error(); err != nil {
860 0 : return 0, err
861 0 : }
862 1 : } else {
863 1 : endIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
864 1 : if err != nil {
865 0 : return 0, errCorruptIndexEntry(err)
866 0 : }
867 1 : endIdxBlock, err := r.readBlock(context.Background(),
868 1 : endIdxBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
869 1 : if err != nil {
870 0 : return 0, err
871 0 : }
872 1 : defer endIdxBlock.Release()
873 1 : endIdxIter, err = rowblk.NewIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
874 1 : if err != nil {
875 0 : return 0, err
876 0 : }
877 : }
878 : }
879 : // startIdxIter should not be nil at this point, while endIdxIter can be if the
880 : // range spans past the end of the file.
881 :
882 1 : kv := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
883 1 : if kv == nil {
884 1 : // The range falls completely after this file, or an error occurred.
885 1 : return 0, startIdxIter.Error()
886 1 : }
887 1 : startBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
888 1 : if err != nil {
889 0 : return 0, errCorruptIndexEntry(err)
890 0 : }
891 :
892 1 : includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
893 1 : // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
894 1 : // Linearly interpolate what is stored in value blocks.
895 1 : //
896 1 : // TODO(sumeer): if we need more accuracy, without loading any data blocks
897 1 : // (which contain the value handles, and which may also be insufficient if
898 1 : // the values are in separate files), we will need to accumulate the
899 1 : // logical size of the key-value pairs and store the cumulative value for
900 1 : // each data block in the index block entry. This increases the size of
901 1 : // the BlockHandle, so wait until this becomes necessary.
902 1 : return dataBlockSize +
903 1 : uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
904 1 : float64(r.Properties.ValueBlocksSize))
905 1 : }
906 1 : if endIdxIter == nil {
907 1 : // The range spans beyond this file. Include data blocks through the last.
908 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
909 1 : }
910 1 : kv = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
911 1 : if kv == nil {
912 1 : if err := endIdxIter.Error(); err != nil {
913 0 : return 0, err
914 0 : }
915 : // The range spans beyond this file. Include data blocks through the last.
916 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
917 : }
918 1 : endBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
919 1 : if err != nil {
920 0 : return 0, errCorruptIndexEntry(err)
921 0 : }
922 1 : return includeInterpolatedValueBlocksSize(
923 1 : endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
924 : }
925 :
926 : // TableFormat returns the format version for the table.
927 1 : func (r *Reader) TableFormat() (TableFormat, error) {
928 1 : if r.err != nil {
929 0 : return TableFormatUnspecified, r.err
930 0 : }
931 1 : return r.tableFormat, nil
932 : }
933 :
934 : // NewReader returns a new table reader for the file. Closing the reader will
935 : // close the file.
936 : //
937 : // The context is used for tracing any operations performed by NewReader; it is
938 : // NOT stored for future use.
939 1 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
940 1 : if f == nil {
941 0 : return nil, errors.New("pebble/table: nil file")
942 0 : }
943 1 : o = o.ensureDefaults()
944 1 : r := &Reader{
945 1 : readable: f,
946 1 : cacheOpts: o.internal.CacheOpts,
947 1 : loadBlockSema: o.LoadBlockSema,
948 1 : deniedUserProperties: o.DeniedUserProperties,
949 1 : filterMetricsTracker: o.FilterMetricsTracker,
950 1 : logger: o.LoggerAndTracer,
951 1 : }
952 1 : if r.cacheOpts.Cache == nil {
953 1 : r.cacheOpts.Cache = cache.New(0)
954 1 : } else {
955 1 : r.cacheOpts.Cache.Ref()
956 1 : }
957 1 : if r.cacheOpts.CacheID == 0 {
958 1 : r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
959 1 : }
960 :
961 1 : var preallocRH objstorageprovider.PreallocatedReadHandle
962 1 : rh := objstorageprovider.UsePreallocatedReadHandle(
963 1 : r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
964 1 : defer rh.Close()
965 1 :
966 1 : footer, err := readFooter(ctx, f, rh, r.logger, r.cacheOpts.FileNum)
967 1 : if err != nil {
968 0 : r.err = err
969 0 : return nil, r.Close()
970 0 : }
971 1 : r.checksumType = footer.checksum
972 1 : r.tableFormat = footer.format
973 1 : // Read the metaindex and properties blocks.
974 1 : if err := r.readMetaindex(ctx, footer.metaindexBH, rh, o.Filters); err != nil {
975 0 : r.err = err
976 0 : return nil, r.Close()
977 0 : }
978 1 : r.indexBH = footer.indexBH
979 1 : r.metaIndexBH = footer.metaindexBH
980 1 : r.footerBH = footer.footerBH
981 1 :
982 1 : if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
983 1 : r.Comparer = o.Comparer
984 1 : r.Compare = o.Comparer.Compare
985 1 : r.SuffixCmp = o.Comparer.CompareSuffixes
986 1 : r.Equal = o.Comparer.Equal
987 1 : r.Split = o.Comparer.Split
988 1 : } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
989 0 : r.Comparer = o.Comparer
990 0 : r.Compare = comparer.Compare
991 0 : r.SuffixCmp = comparer.CompareSuffixes
992 0 : r.Equal = comparer.Equal
993 0 : r.Split = comparer.Split
994 0 : } else {
995 0 : r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
996 0 : errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
997 0 : }
998 :
999 1 : if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
1000 1 : if o.Merger != nil && o.Merger.Name == mergerName {
1001 1 : // opts.Merger matches.
1002 1 : } else if _, ok := o.Mergers[mergerName]; ok {
1003 0 : // Known merger.
1004 0 : } else {
1005 0 : r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
1006 0 : errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
1007 0 : }
1008 : }
1009 :
1010 1 : if r.err != nil {
1011 0 : return nil, r.Close()
1012 0 : }
1013 :
1014 1 : return r, nil
1015 : }
1016 :
1017 : // ReadableFile describes the smallest subset of vfs.File that is required for
1018 : // reading SSTs.
1019 : type ReadableFile interface {
1020 : io.ReaderAt
1021 : io.Closer
1022 : Stat() (os.FileInfo, error)
1023 : }
1024 :
1025 : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
1026 : // implementation (which does not support read-ahead)
1027 1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
1028 1 : info, err := r.Stat()
1029 1 : if err != nil {
1030 0 : return nil, err
1031 0 : }
1032 1 : res := &simpleReadable{
1033 1 : f: r,
1034 1 : size: info.Size(),
1035 1 : }
1036 1 : res.rh = objstorage.MakeNoopReadHandle(res)
1037 1 : return res, nil
1038 : }
1039 :
1040 : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
1041 : type simpleReadable struct {
1042 : f ReadableFile
1043 : size int64
1044 : rh objstorage.NoopReadHandle
1045 : }
1046 :
1047 : var _ objstorage.Readable = (*simpleReadable)(nil)
1048 :
1049 : // ReadAt is part of the objstorage.Readable interface.
1050 1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
1051 1 : n, err := s.f.ReadAt(p, off)
1052 1 : if invariants.Enabled && err == nil && n != len(p) {
1053 0 : panic("short read")
1054 : }
1055 1 : return err
1056 : }
1057 :
1058 : // Close is part of the objstorage.Readable interface.
1059 1 : func (s *simpleReadable) Close() error {
1060 1 : return s.f.Close()
1061 1 : }
1062 :
1063 : // Size is part of the objstorage.Readable interface.
1064 1 : func (s *simpleReadable) Size() int64 {
1065 1 : return s.size
1066 1 : }
1067 :
1068 : // NewReaddHandle is part of the objstorage.Readable interface.
1069 : func (s *simpleReadable) NewReadHandle(
1070 : readBeforeSize objstorage.ReadBeforeSize,
1071 1 : ) objstorage.ReadHandle {
1072 1 : return &s.rh
1073 1 : }
1074 :
1075 0 : func errCorruptIndexEntry(err error) error {
1076 0 : err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
1077 0 : if invariants.Enabled {
1078 0 : panic(err)
1079 : }
1080 0 : return err
1081 : }
1082 :
1083 : type deterministicStopwatchForTesting struct {
1084 : startTime time.Time
1085 : }
1086 :
1087 1 : func makeStopwatch() deterministicStopwatchForTesting {
1088 1 : return deterministicStopwatchForTesting{startTime: time.Now()}
1089 1 : }
1090 :
1091 1 : func (w deterministicStopwatchForTesting) stop() time.Duration {
1092 1 : dur := time.Since(w.startTime)
1093 1 : if deterministicReadBlockDurationForTesting {
1094 0 : dur = slowReadTracingThreshold
1095 0 : }
1096 1 : return dur
1097 : }
|