Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "io"
12 : "os"
13 : "sort"
14 : "time"
15 :
16 : "github.com/cespare/xxhash/v2"
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/bytealloc"
20 : "github.com/cockroachdb/pebble/internal/cache"
21 : "github.com/cockroachdb/pebble/internal/crc"
22 : "github.com/cockroachdb/pebble/internal/invariants"
23 : "github.com/cockroachdb/pebble/internal/keyspan"
24 : "github.com/cockroachdb/pebble/internal/private"
25 : "github.com/cockroachdb/pebble/objstorage"
26 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
27 : )
28 :
29 : var errCorruptIndexEntry = base.CorruptionErrorf("pebble/table: corrupt index entry")
30 : var errReaderClosed = errors.New("pebble/table: reader is closed")
31 :
32 : // decodeBlockHandle returns the block handle encoded at the start of src, as
33 : // well as the number of bytes it occupies. It returns zero if given invalid
34 : // input. A block handle for a data block or a first/lower level index block
35 : // should not be decoded using decodeBlockHandle since the caller may validate
36 : // that the number of bytes decoded is equal to the length of src, which will
37 : // be false if the properties are not decoded. In those cases the caller
38 : // should use decodeBlockHandleWithProperties.
39 2 : func decodeBlockHandle(src []byte) (BlockHandle, int) {
40 2 : offset, n := binary.Uvarint(src)
41 2 : length, m := binary.Uvarint(src[n:])
42 2 : if n == 0 || m == 0 {
43 0 : return BlockHandle{}, 0
44 0 : }
45 2 : return BlockHandle{offset, length}, n + m
46 : }
47 :
48 : // decodeBlockHandleWithProperties returns the block handle and properties
49 : // encoded in src. src needs to be exactly the length that was encoded. This
50 : // method must be used for data block and first/lower level index blocks. The
51 : // properties in the block handle point to the bytes in src.
52 2 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
53 2 : bh, n := decodeBlockHandle(src)
54 2 : if n == 0 {
55 0 : return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
56 0 : }
57 2 : return BlockHandleWithProperties{
58 2 : BlockHandle: bh,
59 2 : Props: src[n:],
60 2 : }, nil
61 : }
62 :
63 2 : func encodeBlockHandle(dst []byte, b BlockHandle) int {
64 2 : n := binary.PutUvarint(dst, b.Offset)
65 2 : m := binary.PutUvarint(dst[n:], b.Length)
66 2 : return n + m
67 2 : }
68 :
69 2 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
70 2 : n := encodeBlockHandle(dst, b.BlockHandle)
71 2 : dst = append(dst[:n], b.Props...)
72 2 : return dst
73 2 : }
74 :
75 : // block is a []byte that holds a sequence of key/value pairs plus an index
76 : // over those pairs.
77 : type block []byte
78 :
79 : type loadBlockResult int8
80 :
81 : const (
82 : loadBlockOK loadBlockResult = iota
83 : // Could be due to error or because no block left to load.
84 : loadBlockFailed
85 : loadBlockIrrelevant
86 : )
87 :
88 : type blockTransform func([]byte) ([]byte, error)
89 :
90 : // ReaderOption provide an interface to do work on Reader while it is being
91 : // opened.
92 : type ReaderOption interface {
93 : // readerApply is called on the reader during opening in order to set internal
94 : // parameters.
95 : readerApply(*Reader)
96 : }
97 :
98 : // Comparers is a map from comparer name to comparer. It is used for debugging
99 : // tools which may be used on multiple databases configured with different
100 : // comparers. Comparers implements the OpenOption interface and can be passed
101 : // as a parameter to NewReader.
102 : type Comparers map[string]*Comparer
103 :
104 1 : func (c Comparers) readerApply(r *Reader) {
105 1 : if r.Compare != nil || r.Properties.ComparerName == "" {
106 1 : return
107 1 : }
108 1 : if comparer, ok := c[r.Properties.ComparerName]; ok {
109 1 : r.Compare = comparer.Compare
110 1 : r.FormatKey = comparer.FormatKey
111 1 : r.Split = comparer.Split
112 1 : }
113 : }
114 :
115 : // Mergers is a map from merger name to merger. It is used for debugging tools
116 : // which may be used on multiple databases configured with different
117 : // mergers. Mergers implements the OpenOption interface and can be passed as
118 : // a parameter to NewReader.
119 : type Mergers map[string]*Merger
120 :
121 1 : func (m Mergers) readerApply(r *Reader) {
122 1 : if r.mergerOK || r.Properties.MergerName == "" {
123 1 : return
124 1 : }
125 1 : _, r.mergerOK = m[r.Properties.MergerName]
126 : }
127 :
128 : // cacheOpts is a Reader open option for specifying the cache ID and sstable file
129 : // number. If not specified, a unique cache ID will be used.
130 : type cacheOpts struct {
131 : cacheID uint64
132 : fileNum base.DiskFileNum
133 : }
134 :
135 : // Marker function to indicate the option should be applied before reading the
136 : // sstable properties and, in the write path, before writing the default
137 : // sstable properties.
138 0 : func (c *cacheOpts) preApply() {}
139 :
140 2 : func (c *cacheOpts) readerApply(r *Reader) {
141 2 : if r.cacheID == 0 {
142 2 : r.cacheID = c.cacheID
143 2 : }
144 2 : if r.fileNum.FileNum() == 0 {
145 2 : r.fileNum = c.fileNum
146 2 : }
147 : }
148 :
149 2 : func (c *cacheOpts) writerApply(w *Writer) {
150 2 : if w.cacheID == 0 {
151 2 : w.cacheID = c.cacheID
152 2 : }
153 2 : if w.fileNum.FileNum() == 0 {
154 2 : w.fileNum = c.fileNum
155 2 : }
156 : }
157 :
158 : // rawTombstonesOpt is a Reader open option for specifying that range
159 : // tombstones returned by Reader.NewRangeDelIter() should not be
160 : // fragmented. Used by debug tools to get a raw view of the tombstones
161 : // contained in an sstable.
162 : type rawTombstonesOpt struct{}
163 :
164 0 : func (rawTombstonesOpt) preApply() {}
165 :
166 1 : func (rawTombstonesOpt) readerApply(r *Reader) {
167 1 : r.rawTombstones = true
168 1 : }
169 :
170 2 : func init() {
171 2 : private.SSTableCacheOpts = func(cacheID uint64, fileNum base.DiskFileNum) interface{} {
172 2 : return &cacheOpts{cacheID, fileNum}
173 2 : }
174 2 : private.SSTableRawTombstonesOpt = rawTombstonesOpt{}
175 : }
176 :
177 : // Reader is a table reader.
178 : type Reader struct {
179 : readable objstorage.Readable
180 : cacheID uint64
181 : fileNum base.DiskFileNum
182 : err error
183 : indexBH BlockHandle
184 : filterBH BlockHandle
185 : rangeDelBH BlockHandle
186 : rangeKeyBH BlockHandle
187 : rangeDelTransform blockTransform
188 : valueBIH valueBlocksIndexHandle
189 : propertiesBH BlockHandle
190 : metaIndexBH BlockHandle
191 : footerBH BlockHandle
192 : opts ReaderOptions
193 : Compare Compare
194 : FormatKey base.FormatKey
195 : Split Split
196 : tableFilter *tableFilterReader
197 : // Keep types that are not multiples of 8 bytes at the end and with
198 : // decreasing size.
199 : Properties Properties
200 : tableFormat TableFormat
201 : rawTombstones bool
202 : mergerOK bool
203 : checksumType ChecksumType
204 : // metaBufferPool is a buffer pool used exclusively when opening a table and
205 : // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
206 : // the BufferPool.pool slice as a part of the Reader allocation. It's
207 : // capacity 3 to accommodate the meta block (1), and both the compressed
208 : // properties block (1) and decompressed properties block (1)
209 : // simultaneously.
210 : metaBufferPool BufferPool
211 : metaBufferPoolAlloc [3]allocedBuffer
212 : }
213 :
214 : // Close implements DB.Close, as documented in the pebble package.
215 2 : func (r *Reader) Close() error {
216 2 : r.opts.Cache.Unref()
217 2 :
218 2 : if r.readable != nil {
219 2 : r.err = firstError(r.err, r.readable.Close())
220 2 : r.readable = nil
221 2 : }
222 :
223 2 : if r.err != nil {
224 1 : return r.err
225 1 : }
226 : // Make any future calls to Get, NewIter or Close return an error.
227 2 : r.err = errReaderClosed
228 2 : return nil
229 : }
230 :
231 : // NewIterWithBlockPropertyFilters returns an iterator for the contents of the
232 : // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after
233 : // itself and returns a nil iterator.
234 : func (r *Reader) NewIterWithBlockPropertyFilters(
235 : lower, upper []byte,
236 : filterer *BlockPropertiesFilterer,
237 : useFilterBlock bool,
238 : stats *base.InternalIteratorStats,
239 : rp ReaderProvider,
240 2 : ) (Iterator, error) {
241 2 : return r.newIterWithBlockPropertyFiltersAndContext(
242 2 : context.Background(),
243 2 : lower, upper, filterer, false, useFilterBlock, stats, rp, nil,
244 2 : )
245 2 : }
246 :
247 : // NewIterWithBlockPropertyFiltersAndContextEtc is similar to
248 : // NewIterWithBlockPropertyFilters and additionally accepts a context for
249 : // tracing.
250 : //
251 : // If hideObsoletePoints, the callee assumes that filterer already includes
252 : // obsoleteKeyBlockPropertyFilter. The caller can satisfy this contract by
253 : // first calling TryAddBlockPropertyFilterForHideObsoletePoints.
254 : func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc(
255 : ctx context.Context,
256 : lower, upper []byte,
257 : filterer *BlockPropertiesFilterer,
258 : hideObsoletePoints, useFilterBlock bool,
259 : stats *base.InternalIteratorStats,
260 : rp ReaderProvider,
261 2 : ) (Iterator, error) {
262 2 : return r.newIterWithBlockPropertyFiltersAndContext(
263 2 : ctx, lower, upper, filterer, hideObsoletePoints, useFilterBlock, stats, rp, nil,
264 2 : )
265 2 : }
266 :
267 : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
268 : // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the
269 : // value of hideObsoletePoints and potentially add a block property filter.
270 : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
271 : snapshotForHideObsoletePoints uint64,
272 : fileLargestSeqNum uint64,
273 : pointKeyFilters []BlockPropertyFilter,
274 2 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
275 2 : hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
276 2 : snapshotForHideObsoletePoints > fileLargestSeqNum
277 2 : if hideObsoletePoints {
278 2 : pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
279 2 : }
280 2 : return hideObsoletePoints, pointKeyFilters
281 : }
282 :
283 : func (r *Reader) newIterWithBlockPropertyFiltersAndContext(
284 : ctx context.Context,
285 : lower, upper []byte,
286 : filterer *BlockPropertiesFilterer,
287 : hideObsoletePoints bool,
288 : useFilterBlock bool,
289 : stats *base.InternalIteratorStats,
290 : rp ReaderProvider,
291 : v *virtualState,
292 2 : ) (Iterator, error) {
293 2 : // NB: pebble.tableCache wraps the returned iterator with one which performs
294 2 : // reference counting on the Reader, preventing the Reader from being closed
295 2 : // until the final iterator closes.
296 2 : if r.Properties.IndexType == twoLevelIndex {
297 2 : i := twoLevelIterPool.Get().(*twoLevelIterator)
298 2 : err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp, nil /* bufferPool */)
299 2 : if err != nil {
300 1 : return nil, err
301 1 : }
302 2 : return i, nil
303 : }
304 :
305 2 : i := singleLevelIterPool.Get().(*singleLevelIterator)
306 2 : err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp, nil /* bufferPool */)
307 2 : if err != nil {
308 1 : return nil, err
309 1 : }
310 2 : return i, nil
311 : }
312 :
313 : // NewIter returns an iterator for the contents of the table. If an error
314 : // occurs, NewIter cleans up after itself and returns a nil iterator. NewIter
315 : // must only be used when the Reader is guaranteed to outlive any LazyValues
316 : // returned from the iter.
317 2 : func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) {
318 2 : return r.NewIterWithBlockPropertyFilters(
319 2 : lower, upper, nil, true /* useFilterBlock */, nil, /* stats */
320 2 : TrivialReaderProvider{Reader: r})
321 2 : }
322 :
323 : // NewCompactionIter returns an iterator similar to NewIter but it also increments
324 : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
325 : // after itself and returns a nil iterator.
326 : func (r *Reader) NewCompactionIter(
327 : bytesIterated *uint64, rp ReaderProvider, bufferPool *BufferPool,
328 2 : ) (Iterator, error) {
329 2 : return r.newCompactionIter(bytesIterated, rp, nil, bufferPool)
330 2 : }
331 :
332 : func (r *Reader) newCompactionIter(
333 : bytesIterated *uint64, rp ReaderProvider, v *virtualState, bufferPool *BufferPool,
334 2 : ) (Iterator, error) {
335 2 : if r.Properties.IndexType == twoLevelIndex {
336 2 : i := twoLevelIterPool.Get().(*twoLevelIterator)
337 2 : err := i.init(
338 2 : context.Background(),
339 2 : r, v, nil /* lower */, nil /* upper */, nil,
340 2 : false /* useFilter */, false, /* hideObsoletePoints */
341 2 : nil /* stats */, rp, bufferPool,
342 2 : )
343 2 : if err != nil {
344 0 : return nil, err
345 0 : }
346 2 : i.setupForCompaction()
347 2 : return &twoLevelCompactionIterator{
348 2 : twoLevelIterator: i,
349 2 : bytesIterated: bytesIterated,
350 2 : }, nil
351 : }
352 2 : i := singleLevelIterPool.Get().(*singleLevelIterator)
353 2 : err := i.init(
354 2 : context.Background(), r, v, nil /* lower */, nil, /* upper */
355 2 : nil, false /* useFilter */, false, /* hideObsoletePoints */
356 2 : nil /* stats */, rp, bufferPool,
357 2 : )
358 2 : if err != nil {
359 0 : return nil, err
360 0 : }
361 2 : i.setupForCompaction()
362 2 : return &compactionIterator{
363 2 : singleLevelIterator: i,
364 2 : bytesIterated: bytesIterated,
365 2 : }, nil
366 : }
367 :
368 : // NewRawRangeDelIter returns an internal iterator for the contents of the
369 : // range-del block for the table. Returns nil if the table does not contain
370 : // any range deletions.
371 : //
372 : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
373 : // iterator. Add WithContext methods since the existing ones are public.
374 2 : func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {
375 2 : if r.rangeDelBH.Length == 0 {
376 2 : return nil, nil
377 2 : }
378 2 : h, err := r.readRangeDel(nil /* stats */)
379 2 : if err != nil {
380 1 : return nil, err
381 1 : }
382 2 : i := &fragmentBlockIter{elideSameSeqnum: true}
383 2 : if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
384 0 : return nil, err
385 0 : }
386 2 : return i, nil
387 : }
388 :
389 : // NewRawRangeKeyIter returns an internal iterator for the contents of the
390 : // range-key block for the table. Returns nil if the table does not contain any
391 : // range keys.
392 : //
393 : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
394 : // iterator. Add WithContext methods since the existing ones are public.
395 2 : func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
396 2 : if r.rangeKeyBH.Length == 0 {
397 2 : return nil, nil
398 2 : }
399 2 : h, err := r.readRangeKey(nil /* stats */)
400 2 : if err != nil {
401 0 : return nil, err
402 0 : }
403 2 : i := rangeKeyFragmentBlockIterPool.Get().(*rangeKeyFragmentBlockIter)
404 2 : if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
405 0 : return nil, err
406 0 : }
407 2 : return i, nil
408 : }
409 :
410 : type rangeKeyFragmentBlockIter struct {
411 : fragmentBlockIter
412 : }
413 :
414 2 : func (i *rangeKeyFragmentBlockIter) Close() error {
415 2 : err := i.fragmentBlockIter.Close()
416 2 : i.fragmentBlockIter = i.fragmentBlockIter.resetForReuse()
417 2 : rangeKeyFragmentBlockIterPool.Put(i)
418 2 : return err
419 2 : }
420 :
421 : func (r *Reader) readIndex(
422 : ctx context.Context, stats *base.InternalIteratorStats,
423 2 : ) (bufferHandle, error) {
424 2 : ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
425 2 : return r.readBlock(ctx, r.indexBH, nil, nil, stats, nil /* buffer pool */)
426 2 : }
427 :
428 : func (r *Reader) readFilter(
429 : ctx context.Context, stats *base.InternalIteratorStats,
430 2 : ) (bufferHandle, error) {
431 2 : ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
432 2 : return r.readBlock(ctx, r.filterBH, nil /* transform */, nil /* readHandle */, stats, nil /* buffer pool */)
433 2 : }
434 :
435 2 : func (r *Reader) readRangeDel(stats *base.InternalIteratorStats) (bufferHandle, error) {
436 2 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
437 2 : return r.readBlock(ctx, r.rangeDelBH, r.rangeDelTransform, nil /* readHandle */, stats, nil /* buffer pool */)
438 2 : }
439 :
440 2 : func (r *Reader) readRangeKey(stats *base.InternalIteratorStats) (bufferHandle, error) {
441 2 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
442 2 : return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, nil /* buffer pool */)
443 2 : }
444 :
445 : func checkChecksum(
446 : checksumType ChecksumType, b []byte, bh BlockHandle, fileNum base.FileNum,
447 2 : ) error {
448 2 : expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
449 2 : var computedChecksum uint32
450 2 : switch checksumType {
451 2 : case ChecksumTypeCRC32c:
452 2 : computedChecksum = crc.New(b[:bh.Length+1]).Value()
453 1 : case ChecksumTypeXXHash64:
454 1 : computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
455 0 : default:
456 0 : return errors.Errorf("unsupported checksum type: %d", checksumType)
457 : }
458 :
459 2 : if expectedChecksum != computedChecksum {
460 1 : return base.CorruptionErrorf(
461 1 : "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
462 1 : errors.Safe(fileNum), errors.Safe(bh.Offset), errors.Safe(bh.Length))
463 1 : }
464 2 : return nil
465 : }
466 :
467 : type cacheValueOrBuf struct {
468 : // buf.Valid() returns true if backed by a BufferPool.
469 : buf Buf
470 : // v is non-nil if backed by the block cache.
471 : v *cache.Value
472 : }
473 :
474 2 : func (b cacheValueOrBuf) get() []byte {
475 2 : if b.buf.Valid() {
476 2 : return b.buf.p.pool[b.buf.i].b
477 2 : }
478 2 : return b.v.Buf()
479 : }
480 :
481 2 : func (b cacheValueOrBuf) release() {
482 2 : if b.buf.Valid() {
483 2 : b.buf.Release()
484 2 : } else {
485 2 : cache.Free(b.v)
486 2 : }
487 : }
488 :
489 2 : func (b cacheValueOrBuf) truncate(n int) {
490 2 : if b.buf.Valid() {
491 2 : b.buf.p.pool[b.buf.i].b = b.buf.p.pool[b.buf.i].b[:n]
492 2 : } else {
493 2 : b.v.Truncate(n)
494 2 : }
495 : }
496 :
497 : func (r *Reader) readBlock(
498 : ctx context.Context,
499 : bh BlockHandle,
500 : transform blockTransform,
501 : readHandle objstorage.ReadHandle,
502 : stats *base.InternalIteratorStats,
503 : bufferPool *BufferPool,
504 2 : ) (handle bufferHandle, _ error) {
505 2 : if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
506 2 : // Cache hit.
507 2 : if readHandle != nil {
508 2 : readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+blockTrailerLen))
509 2 : }
510 2 : if stats != nil {
511 2 : stats.BlockBytes += bh.Length
512 2 : stats.BlockBytesInCache += bh.Length
513 2 : }
514 : // This block is already in the cache; return a handle to existing vlaue
515 : // in the cache.
516 2 : return bufferHandle{h: h}, nil
517 : }
518 :
519 : // Cache miss.
520 2 : var compressed cacheValueOrBuf
521 2 : if bufferPool != nil {
522 2 : compressed = cacheValueOrBuf{
523 2 : buf: bufferPool.Alloc(int(bh.Length + blockTrailerLen)),
524 2 : }
525 2 : } else {
526 2 : compressed = cacheValueOrBuf{
527 2 : v: cache.Alloc(int(bh.Length + blockTrailerLen)),
528 2 : }
529 2 : }
530 :
531 2 : readStartTime := time.Now()
532 2 : var err error
533 2 : if readHandle != nil {
534 2 : err = readHandle.ReadAt(ctx, compressed.get(), int64(bh.Offset))
535 2 : } else {
536 2 : err = r.readable.ReadAt(ctx, compressed.get(), int64(bh.Offset))
537 2 : }
538 2 : readDuration := time.Since(readStartTime)
539 2 : // TODO(sumeer): should the threshold be configurable.
540 2 : const slowReadTracingThreshold = 5 * time.Millisecond
541 2 : // The invariants.Enabled path is for deterministic testing.
542 2 : if invariants.Enabled {
543 2 : readDuration = slowReadTracingThreshold
544 2 : }
545 : // Call IsTracingEnabled to avoid the allocations of boxing integers into an
546 : // interface{}, unless necessary.
547 2 : if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) {
548 1 : r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s",
549 1 : int(bh.Length+blockTrailerLen), readDuration.String())
550 1 : }
551 2 : if stats != nil {
552 2 : stats.BlockReadDuration += readDuration
553 2 : }
554 2 : if err != nil {
555 1 : compressed.release()
556 1 : return bufferHandle{}, err
557 1 : }
558 2 : if err := checkChecksum(r.checksumType, compressed.get(), bh, r.fileNum.FileNum()); err != nil {
559 1 : compressed.release()
560 1 : return bufferHandle{}, err
561 1 : }
562 :
563 2 : typ := blockType(compressed.get()[bh.Length])
564 2 : compressed.truncate(int(bh.Length))
565 2 :
566 2 : var decompressed cacheValueOrBuf
567 2 : if typ == noCompressionBlockType {
568 2 : decompressed = compressed
569 2 : } else {
570 2 : // Decode the length of the decompressed value.
571 2 : decodedLen, prefixLen, err := decompressedLen(typ, compressed.get())
572 2 : if err != nil {
573 0 : compressed.release()
574 0 : return bufferHandle{}, err
575 0 : }
576 :
577 2 : if bufferPool != nil {
578 2 : decompressed = cacheValueOrBuf{buf: bufferPool.Alloc(decodedLen)}
579 2 : } else {
580 2 : decompressed = cacheValueOrBuf{v: cache.Alloc(decodedLen)}
581 2 : }
582 2 : if _, err := decompressInto(typ, compressed.get()[prefixLen:], decompressed.get()); err != nil {
583 0 : compressed.release()
584 0 : return bufferHandle{}, err
585 0 : }
586 2 : compressed.release()
587 : }
588 :
589 2 : if transform != nil {
590 1 : // Transforming blocks is very rare, so the extra copy of the
591 1 : // transformed data is not problematic.
592 1 : tmpTransformed, err := transform(decompressed.get())
593 1 : if err != nil {
594 0 : decompressed.release()
595 0 : return bufferHandle{}, err
596 0 : }
597 :
598 1 : var transformed cacheValueOrBuf
599 1 : if bufferPool != nil {
600 0 : transformed = cacheValueOrBuf{buf: bufferPool.Alloc(len(tmpTransformed))}
601 1 : } else {
602 1 : transformed = cacheValueOrBuf{v: cache.Alloc(len(tmpTransformed))}
603 1 : }
604 1 : copy(transformed.get(), tmpTransformed)
605 1 : decompressed.release()
606 1 : decompressed = transformed
607 : }
608 :
609 2 : if stats != nil {
610 2 : stats.BlockBytes += bh.Length
611 2 : }
612 2 : if decompressed.buf.Valid() {
613 2 : return bufferHandle{b: decompressed.buf}, nil
614 2 : }
615 2 : h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, decompressed.v)
616 2 : return bufferHandle{h: h}, nil
617 : }
618 :
619 1 : func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) {
620 1 : // Convert v1 (RocksDB format) range-del blocks to v2 blocks on the fly. The
621 1 : // v1 format range-del blocks have unfragmented and unsorted range
622 1 : // tombstones. We need properly fragmented and sorted range tombstones in
623 1 : // order to serve from them directly.
624 1 : iter := &blockIter{}
625 1 : if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false); err != nil {
626 0 : return nil, err
627 0 : }
628 1 : var tombstones []keyspan.Span
629 1 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
630 1 : t := keyspan.Span{
631 1 : Start: key.UserKey,
632 1 : End: value.InPlaceValue(),
633 1 : Keys: []keyspan.Key{{Trailer: key.Trailer}},
634 1 : }
635 1 : tombstones = append(tombstones, t)
636 1 : }
637 1 : keyspan.Sort(r.Compare, tombstones)
638 1 :
639 1 : // Fragment the tombstones, outputting them directly to a block writer.
640 1 : rangeDelBlock := blockWriter{
641 1 : restartInterval: 1,
642 1 : }
643 1 : frag := keyspan.Fragmenter{
644 1 : Cmp: r.Compare,
645 1 : Format: r.FormatKey,
646 1 : Emit: func(s keyspan.Span) {
647 1 : for _, k := range s.Keys {
648 1 : startIK := InternalKey{UserKey: s.Start, Trailer: k.Trailer}
649 1 : rangeDelBlock.add(startIK, s.End)
650 1 : }
651 : },
652 : }
653 1 : for i := range tombstones {
654 1 : frag.Add(tombstones[i])
655 1 : }
656 1 : frag.Finish()
657 1 :
658 1 : // Return the contents of the constructed v2 format range-del block.
659 1 : return rangeDelBlock.finish(), nil
660 : }
661 :
662 2 : func (r *Reader) readMetaindex(metaindexBH BlockHandle) error {
663 2 : // We use a BufferPool when reading metaindex blocks in order to avoid
664 2 : // populating the block cache with these blocks. In heavy-write workloads,
665 2 : // especially with high compaction concurrency, new tables may be created
666 2 : // frequently. Populating the block cache with these metaindex blocks adds
667 2 : // additional contention on the block cache mutexes (see #1997).
668 2 : // Additionally, these blocks are exceedingly unlikely to be read again
669 2 : // while they're still in the block cache except in misconfigurations with
670 2 : // excessive sstables counts or a table cache that's far too small.
671 2 : r.metaBufferPool.initPreallocated(r.metaBufferPoolAlloc[:0])
672 2 : // When we're finished, release the buffers we've allocated back to memory
673 2 : // allocator. We don't expect to use metaBufferPool again.
674 2 : defer r.metaBufferPool.Release()
675 2 :
676 2 : b, err := r.readBlock(
677 2 : context.Background(), metaindexBH, nil /* transform */, nil /* readHandle */, nil /* stats */, &r.metaBufferPool)
678 2 : if err != nil {
679 1 : return err
680 1 : }
681 2 : data := b.Get()
682 2 : defer b.Release()
683 2 :
684 2 : if uint64(len(data)) != metaindexBH.Length {
685 0 : return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
686 0 : errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
687 0 : }
688 :
689 2 : i, err := newRawBlockIter(bytes.Compare, data)
690 2 : if err != nil {
691 0 : return err
692 0 : }
693 :
694 2 : meta := map[string]BlockHandle{}
695 2 : for valid := i.First(); valid; valid = i.Next() {
696 2 : value := i.Value()
697 2 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
698 2 : vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
699 2 : if err != nil {
700 0 : return err
701 0 : }
702 2 : if n == 0 || n != len(value) {
703 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
704 0 : }
705 2 : r.valueBIH = vbih
706 2 : } else {
707 2 : bh, n := decodeBlockHandle(value)
708 2 : if n == 0 || n != len(value) {
709 0 : return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
710 0 : }
711 2 : meta[string(i.Key().UserKey)] = bh
712 : }
713 : }
714 2 : if err := i.Close(); err != nil {
715 0 : return err
716 0 : }
717 :
718 2 : if bh, ok := meta[metaPropertiesName]; ok {
719 2 : b, err = r.readBlock(
720 2 : context.Background(), bh, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */)
721 2 : if err != nil {
722 1 : return err
723 1 : }
724 2 : r.propertiesBH = bh
725 2 : err := r.Properties.load(b.Get(), bh.Offset, r.opts.DeniedUserProperties)
726 2 : b.Release()
727 2 : if err != nil {
728 0 : return err
729 0 : }
730 : }
731 :
732 2 : if bh, ok := meta[metaRangeDelV2Name]; ok {
733 2 : r.rangeDelBH = bh
734 2 : } else if bh, ok := meta[metaRangeDelName]; ok {
735 1 : r.rangeDelBH = bh
736 1 : if !r.rawTombstones {
737 1 : r.rangeDelTransform = r.transformRangeDelV1
738 1 : }
739 : }
740 :
741 2 : if bh, ok := meta[metaRangeKeyName]; ok {
742 2 : r.rangeKeyBH = bh
743 2 : }
744 :
745 2 : for name, fp := range r.opts.Filters {
746 2 : types := []struct {
747 2 : ftype FilterType
748 2 : prefix string
749 2 : }{
750 2 : {TableFilter, "fullfilter."},
751 2 : }
752 2 : var done bool
753 2 : for _, t := range types {
754 2 : if bh, ok := meta[t.prefix+name]; ok {
755 2 : r.filterBH = bh
756 2 :
757 2 : switch t.ftype {
758 2 : case TableFilter:
759 2 : r.tableFilter = newTableFilterReader(fp)
760 0 : default:
761 0 : return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
762 : }
763 :
764 2 : done = true
765 2 : break
766 : }
767 : }
768 2 : if done {
769 2 : break
770 : }
771 : }
772 2 : return nil
773 : }
774 :
775 : // Layout returns the layout (block organization) for an sstable.
776 2 : func (r *Reader) Layout() (*Layout, error) {
777 2 : if r.err != nil {
778 0 : return nil, r.err
779 0 : }
780 :
781 2 : l := &Layout{
782 2 : Data: make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
783 2 : Filter: r.filterBH,
784 2 : RangeDel: r.rangeDelBH,
785 2 : RangeKey: r.rangeKeyBH,
786 2 : ValueIndex: r.valueBIH.h,
787 2 : Properties: r.propertiesBH,
788 2 : MetaIndex: r.metaIndexBH,
789 2 : Footer: r.footerBH,
790 2 : Format: r.tableFormat,
791 2 : }
792 2 :
793 2 : indexH, err := r.readIndex(context.Background(), nil)
794 2 : if err != nil {
795 1 : return nil, err
796 1 : }
797 2 : defer indexH.Release()
798 2 :
799 2 : var alloc bytealloc.A
800 2 :
801 2 : if r.Properties.IndexPartitions == 0 {
802 2 : l.Index = append(l.Index, r.indexBH)
803 2 : iter, _ := newBlockIter(r.Compare, indexH.Get())
804 2 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
805 2 : dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
806 2 : if err != nil {
807 0 : return nil, errCorruptIndexEntry
808 0 : }
809 2 : if len(dataBH.Props) > 0 {
810 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
811 1 : }
812 2 : l.Data = append(l.Data, dataBH)
813 : }
814 2 : } else {
815 2 : l.TopIndex = r.indexBH
816 2 : topIter, _ := newBlockIter(r.Compare, indexH.Get())
817 2 : iter := &blockIter{}
818 2 : for key, value := topIter.First(); key != nil; key, value = topIter.Next() {
819 2 : indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
820 2 : if err != nil {
821 0 : return nil, errCorruptIndexEntry
822 0 : }
823 2 : l.Index = append(l.Index, indexBH.BlockHandle)
824 2 :
825 2 : subIndex, err := r.readBlock(context.Background(), indexBH.BlockHandle,
826 2 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */)
827 2 : if err != nil {
828 1 : return nil, err
829 1 : }
830 2 : if err := iter.init(r.Compare, subIndex.Get(), 0, /* globalSeqNum */
831 2 : false /* hideObsoletePoints */); err != nil {
832 0 : return nil, err
833 0 : }
834 2 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
835 2 : dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
836 2 : if len(dataBH.Props) > 0 {
837 1 : alloc, dataBH.Props = alloc.Copy(dataBH.Props)
838 1 : }
839 2 : if err != nil {
840 0 : return nil, errCorruptIndexEntry
841 0 : }
842 2 : l.Data = append(l.Data, dataBH)
843 : }
844 2 : subIndex.Release()
845 2 : *iter = iter.resetForReuse()
846 : }
847 : }
848 2 : if r.valueBIH.h.Length != 0 {
849 1 : vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil /* buffer pool */)
850 1 : if err != nil {
851 0 : return nil, err
852 0 : }
853 1 : defer vbiH.Release()
854 1 : vbiBlock := vbiH.Get()
855 1 : indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
856 1 : r.valueBIH.blockLengthByteLength)
857 1 : i := 0
858 1 : for len(vbiBlock) != 0 {
859 1 : if len(vbiBlock) < indexEntryLen {
860 0 : return nil, errors.Errorf(
861 0 : "remaining value index block %d does not contain a full entry of length %d",
862 0 : len(vbiBlock), indexEntryLen)
863 0 : }
864 1 : n := int(r.valueBIH.blockNumByteLength)
865 1 : bn := int(littleEndianGet(vbiBlock, n))
866 1 : if bn != i {
867 0 : return nil, errors.Errorf("unexpected block num %d, expected %d",
868 0 : bn, i)
869 0 : }
870 1 : i++
871 1 : vbiBlock = vbiBlock[n:]
872 1 : n = int(r.valueBIH.blockOffsetByteLength)
873 1 : blockOffset := littleEndianGet(vbiBlock, n)
874 1 : vbiBlock = vbiBlock[n:]
875 1 : n = int(r.valueBIH.blockLengthByteLength)
876 1 : blockLen := littleEndianGet(vbiBlock, n)
877 1 : vbiBlock = vbiBlock[n:]
878 1 : l.ValueBlock = append(l.ValueBlock, BlockHandle{Offset: blockOffset, Length: blockLen})
879 : }
880 : }
881 :
882 2 : return l, nil
883 : }
884 :
885 : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
886 2 : func (r *Reader) ValidateBlockChecksums() error {
887 2 : // Pre-compute the BlockHandles for the underlying file.
888 2 : l, err := r.Layout()
889 2 : if err != nil {
890 1 : return err
891 1 : }
892 :
893 : // Construct the set of blocks to check. Note that the footer is not checked
894 : // as it is not a block with a checksum.
895 2 : blocks := make([]BlockHandle, len(l.Data))
896 2 : for i := range l.Data {
897 2 : blocks[i] = l.Data[i].BlockHandle
898 2 : }
899 2 : blocks = append(blocks, l.Index...)
900 2 : blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
901 2 :
902 2 : // Sorting by offset ensures we are performing a sequential scan of the
903 2 : // file.
904 2 : sort.Slice(blocks, func(i, j int) bool {
905 2 : return blocks[i].Offset < blocks[j].Offset
906 2 : })
907 :
908 : // Check all blocks sequentially. Make use of read-ahead, given we are
909 : // scanning the entire file from start to end.
910 2 : rh := r.readable.NewReadHandle(context.TODO())
911 2 : defer rh.Close()
912 2 :
913 2 : for _, bh := range blocks {
914 2 : // Certain blocks may not be present, in which case we skip them.
915 2 : if bh.Length == 0 {
916 2 : continue
917 : }
918 :
919 : // Read the block, which validates the checksum.
920 2 : h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* buffer pool */)
921 2 : if err != nil {
922 1 : return err
923 1 : }
924 2 : h.Release()
925 : }
926 :
927 2 : return nil
928 : }
929 :
930 : // EstimateDiskUsage returns the total size of data blocks overlapping the range
931 : // `[start, end]`. Even if a data block partially overlaps, or we cannot
932 : // determine overlap due to abbreviated index keys, the full data block size is
933 : // included in the estimation.
934 : //
935 : // This function does not account for any metablock space usage. Assumes there
936 : // is at least partial overlap, i.e., `[start, end]` falls neither completely
937 : // before nor completely after the file's range.
938 : //
939 : // Only blocks containing point keys are considered. Range deletion and range
940 : // key blocks are not considered.
941 : //
942 : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
943 : // data blocks overlapped and add that same fraction of the metadata blocks to the
944 : // estimate.
945 2 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
946 2 : if r.err != nil {
947 0 : return 0, r.err
948 0 : }
949 :
950 2 : indexH, err := r.readIndex(context.Background(), nil)
951 2 : if err != nil {
952 1 : return 0, err
953 1 : }
954 2 : defer indexH.Release()
955 2 :
956 2 : // Iterators over the bottom-level index blocks containing start and end.
957 2 : // These may be different in case of partitioned index but will both point
958 2 : // to the same blockIter over the single index in the unpartitioned case.
959 2 : var startIdxIter, endIdxIter *blockIter
960 2 : if r.Properties.IndexPartitions == 0 {
961 2 : iter, err := newBlockIter(r.Compare, indexH.Get())
962 2 : if err != nil {
963 0 : return 0, err
964 0 : }
965 2 : startIdxIter = iter
966 2 : endIdxIter = iter
967 2 : } else {
968 2 : topIter, err := newBlockIter(r.Compare, indexH.Get())
969 2 : if err != nil {
970 0 : return 0, err
971 0 : }
972 :
973 2 : key, val := topIter.SeekGE(start, base.SeekGEFlagsNone)
974 2 : if key == nil {
975 1 : // The range falls completely after this file, or an error occurred.
976 1 : return 0, topIter.Error()
977 1 : }
978 2 : startIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
979 2 : if err != nil {
980 0 : return 0, errCorruptIndexEntry
981 0 : }
982 2 : startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.BlockHandle,
983 2 : nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */)
984 2 : if err != nil {
985 1 : return 0, err
986 1 : }
987 2 : defer startIdxBlock.Release()
988 2 : startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get())
989 2 : if err != nil {
990 0 : return 0, err
991 0 : }
992 :
993 2 : key, val = topIter.SeekGE(end, base.SeekGEFlagsNone)
994 2 : if key == nil {
995 1 : if err := topIter.Error(); err != nil {
996 0 : return 0, err
997 0 : }
998 2 : } else {
999 2 : endIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1000 2 : if err != nil {
1001 0 : return 0, errCorruptIndexEntry
1002 0 : }
1003 2 : endIdxBlock, err := r.readBlock(context.Background(),
1004 2 : endIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */)
1005 2 : if err != nil {
1006 1 : return 0, err
1007 1 : }
1008 2 : defer endIdxBlock.Release()
1009 2 : endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get())
1010 2 : if err != nil {
1011 0 : return 0, err
1012 0 : }
1013 : }
1014 : }
1015 : // startIdxIter should not be nil at this point, while endIdxIter can be if the
1016 : // range spans past the end of the file.
1017 :
1018 2 : key, val := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
1019 2 : if key == nil {
1020 2 : // The range falls completely after this file, or an error occurred.
1021 2 : return 0, startIdxIter.Error()
1022 2 : }
1023 2 : startBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1024 2 : if err != nil {
1025 0 : return 0, errCorruptIndexEntry
1026 0 : }
1027 :
1028 2 : includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
1029 2 : // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
1030 2 : // Linearly interpolate what is stored in value blocks.
1031 2 : //
1032 2 : // TODO(sumeer): if we need more accuracy, without loading any data blocks
1033 2 : // (which contain the value handles, and which may also be insufficient if
1034 2 : // the values are in separate files), we will need to accumulate the
1035 2 : // logical size of the key-value pairs and store the cumulative value for
1036 2 : // each data block in the index block entry. This increases the size of
1037 2 : // the BlockHandle, so wait until this becomes necessary.
1038 2 : return dataBlockSize +
1039 2 : uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
1040 2 : float64(r.Properties.ValueBlocksSize))
1041 2 : }
1042 2 : if endIdxIter == nil {
1043 1 : // The range spans beyond this file. Include data blocks through the last.
1044 1 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1045 1 : }
1046 2 : key, val = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
1047 2 : if key == nil {
1048 2 : if err := endIdxIter.Error(); err != nil {
1049 0 : return 0, err
1050 0 : }
1051 : // The range spans beyond this file. Include data blocks through the last.
1052 2 : return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
1053 : }
1054 2 : endBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
1055 2 : if err != nil {
1056 0 : return 0, errCorruptIndexEntry
1057 0 : }
1058 2 : return includeInterpolatedValueBlocksSize(
1059 2 : endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset), nil
1060 : }
1061 :
1062 : // TableFormat returns the format version for the table.
1063 2 : func (r *Reader) TableFormat() (TableFormat, error) {
1064 2 : if r.err != nil {
1065 0 : return TableFormatUnspecified, r.err
1066 0 : }
1067 2 : return r.tableFormat, nil
1068 : }
1069 :
1070 : // NewReader returns a new table reader for the file. Closing the reader will
1071 : // close the file.
1072 2 : func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) {
1073 2 : o = o.ensureDefaults()
1074 2 : r := &Reader{
1075 2 : readable: f,
1076 2 : opts: o,
1077 2 : }
1078 2 : if r.opts.Cache == nil {
1079 1 : r.opts.Cache = cache.New(0)
1080 2 : } else {
1081 2 : r.opts.Cache.Ref()
1082 2 : }
1083 :
1084 2 : if f == nil {
1085 1 : r.err = errors.New("pebble/table: nil file")
1086 1 : return nil, r.Close()
1087 1 : }
1088 :
1089 : // Note that the extra options are applied twice. First here for pre-apply
1090 : // options, and then below for post-apply options. Pre and post refer to
1091 : // before and after reading the metaindex and properties.
1092 2 : type preApply interface{ preApply() }
1093 2 : for _, opt := range extraOpts {
1094 2 : if _, ok := opt.(preApply); ok {
1095 2 : opt.readerApply(r)
1096 2 : }
1097 : }
1098 2 : if r.cacheID == 0 {
1099 1 : r.cacheID = r.opts.Cache.NewID()
1100 1 : }
1101 :
1102 2 : footer, err := readFooter(f)
1103 2 : if err != nil {
1104 1 : r.err = err
1105 1 : return nil, r.Close()
1106 1 : }
1107 2 : r.checksumType = footer.checksum
1108 2 : r.tableFormat = footer.format
1109 2 : // Read the metaindex.
1110 2 : if err := r.readMetaindex(footer.metaindexBH); err != nil {
1111 1 : r.err = err
1112 1 : return nil, r.Close()
1113 1 : }
1114 2 : r.indexBH = footer.indexBH
1115 2 : r.metaIndexBH = footer.metaindexBH
1116 2 : r.footerBH = footer.footerBH
1117 2 :
1118 2 : if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
1119 2 : r.Compare = o.Comparer.Compare
1120 2 : r.FormatKey = o.Comparer.FormatKey
1121 2 : r.Split = o.Comparer.Split
1122 2 : }
1123 :
1124 2 : if o.MergerName == r.Properties.MergerName {
1125 2 : r.mergerOK = true
1126 2 : }
1127 :
1128 : // Apply the extra options again now that the comparer and merger names are
1129 : // known.
1130 2 : for _, opt := range extraOpts {
1131 2 : if _, ok := opt.(preApply); !ok {
1132 2 : opt.readerApply(r)
1133 2 : }
1134 : }
1135 :
1136 2 : if r.Compare == nil {
1137 1 : r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
1138 1 : errors.Safe(r.fileNum), errors.Safe(r.Properties.ComparerName))
1139 1 : }
1140 2 : if !r.mergerOK {
1141 1 : if name := r.Properties.MergerName; name != "" && name != "nullptr" {
1142 1 : r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
1143 1 : errors.Safe(r.fileNum), errors.Safe(r.Properties.MergerName))
1144 1 : }
1145 : }
1146 2 : if r.err != nil {
1147 1 : return nil, r.Close()
1148 1 : }
1149 :
1150 2 : return r, nil
1151 : }
1152 :
1153 : // ReadableFile describes the smallest subset of vfs.File that is required for
1154 : // reading SSTs.
1155 : type ReadableFile interface {
1156 : io.ReaderAt
1157 : io.Closer
1158 : Stat() (os.FileInfo, error)
1159 : }
1160 :
1161 : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
1162 : // implementation (which does not support read-ahead)
1163 2 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
1164 2 : info, err := r.Stat()
1165 2 : if err != nil {
1166 1 : return nil, err
1167 1 : }
1168 2 : res := &simpleReadable{
1169 2 : f: r,
1170 2 : size: info.Size(),
1171 2 : }
1172 2 : res.rh = objstorage.MakeNoopReadHandle(res)
1173 2 : return res, nil
1174 : }
1175 :
1176 : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
1177 : type simpleReadable struct {
1178 : f ReadableFile
1179 : size int64
1180 : rh objstorage.NoopReadHandle
1181 : }
1182 :
1183 : var _ objstorage.Readable = (*simpleReadable)(nil)
1184 :
1185 : // ReadAt is part of the objstorage.Readable interface.
1186 2 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
1187 2 : n, err := s.f.ReadAt(p, off)
1188 2 : if invariants.Enabled && err == nil && n != len(p) {
1189 0 : panic("short read")
1190 : }
1191 2 : return err
1192 : }
1193 :
1194 : // Close is part of the objstorage.Readable interface.
1195 2 : func (s *simpleReadable) Close() error {
1196 2 : return s.f.Close()
1197 2 : }
1198 :
1199 : // Size is part of the objstorage.Readable interface.
1200 2 : func (s *simpleReadable) Size() int64 {
1201 2 : return s.size
1202 2 : }
1203 :
1204 : // NewReaddHandle is part of the objstorage.Readable interface.
1205 2 : func (s *simpleReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
1206 2 : return &s.rh
1207 2 : }
|