Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "sync"
14 : "sync/atomic"
15 : "unsafe"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/cache"
20 : "github.com/cockroachdb/pebble/internal/genericcache"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/sstableinternal"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
28 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
29 : "github.com/cockroachdb/pebble/sstable"
30 : "github.com/cockroachdb/pebble/sstable/blob"
31 : "github.com/cockroachdb/pebble/sstable/block"
32 : "github.com/cockroachdb/pebble/sstable/valblk"
33 : "github.com/cockroachdb/pebble/vfs"
34 : "github.com/cockroachdb/redact"
35 : )
36 :
37 : // FileCacheMetrics contains metrics for the file cache. Note that the file
38 : // cache is normally shared between all the stores on a node.
39 : type FileCacheMetrics struct {
40 : // The number of bytes inuse by the cache.
41 : Size int64
42 : TableCount int64
43 : BlobFileCount int64
44 : Hits int64
45 : Misses int64
46 : }
47 :
48 : var emptyIter = &errorIter{err: nil}
49 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
50 :
51 : // tableNewIters creates new iterators (point, range deletion and/or range key)
52 : // for the given table metadata. Which of the various iterator kinds the user is
53 : // requesting is specified with the iterKinds bitmap.
54 : //
55 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
56 : // populated with iterators.
57 : //
58 : // If a point iterator is requested and the operation was successful,
59 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
60 : // finished.
61 : //
62 : // If a range deletion or range key iterator is requested, the corresponding
63 : // iterator may be nil if the table does not contain any keys of the
64 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
65 : // RangeKey() convenience methods that return non-nil empty iterators that may
66 : // be used if the caller requires a non-nil iterator.
67 : //
68 : // On error, all iterators are nil.
69 : //
70 : // The only (non-test) implementation of tableNewIters is
71 : // fileCacheHandle.newIters().
72 : type tableNewIters func(
73 : ctx context.Context,
74 : file *manifest.TableMetadata,
75 : opts *IterOptions,
76 : internalOpts internalIterOpts,
77 : kinds iterKinds,
78 : ) (iterSet, error)
79 :
80 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
81 : // for the rangedel iterator returned by tableNewIters.
82 2 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
83 2 : return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
84 2 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
85 2 : if err != nil {
86 0 : return nil, err
87 0 : }
88 2 : return iters.RangeDeletion(), nil
89 : }
90 : }
91 :
92 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
93 : // for the range key iterator returned by tableNewIters.
94 2 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
95 2 : return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
96 2 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
97 2 : if err != nil {
98 1 : return nil, err
99 1 : }
100 2 : return iters.RangeKey(), nil
101 : }
102 : }
103 :
104 : // fileCacheHandle is used to access the file cache. Each DB has its own handle.
105 : type fileCacheHandle struct {
106 : fileCache *FileCache
107 :
108 : // The handle contains fields which are unique to each DB. Note that these get
109 : // accessed from all shards, so keep read-only fields separate for read-write
110 : // fields.
111 : loggerAndTracer LoggerAndTracer
112 : blockCacheHandle *cache.Handle
113 : objProvider objstorage.Provider
114 : readerOpts sstable.ReaderOptions
115 :
116 : // iterCount keeps track of how many iterators are open. It is used to keep
117 : // track of leaked iterators on a per-db level.
118 : iterCount atomic.Int32
119 : sstStatsCollector block.CategoryStatsCollector
120 :
121 : // reportCorruptionFn is used for block.ReadEnv.ReportCorruptionFn. It expects
122 : // the first argument to be a `*TableMetadata`. It returns an error that
123 : // contains more details.
124 : reportCorruptionFn func(any, error) error
125 :
126 : // This struct is only populated in race builds.
127 : raceMu struct {
128 : sync.Mutex
129 : // nextRefID is the next ID to allocate for a new reference.
130 : nextRefID uint64
131 : // openRefs maps reference IDs to the stack trace recorded at creation
132 : // time. It's used to track which call paths leaked open references to
133 : // files.
134 : openRefs map[uint64][]byte
135 : }
136 : }
137 :
138 : // Assert that *fileCacheHandle implements blob.ReaderProvider.
139 : var _ blob.ReaderProvider = (*fileCacheHandle)(nil)
140 :
141 : // newHandle creates a handle for the FileCache which has its own options. Each
142 : // handle has its own set of files in the cache, separate from those of other
143 : // handles.
144 : func (c *FileCache) newHandle(
145 : cacheHandle *cache.Handle,
146 : objProvider objstorage.Provider,
147 : loggerAndTracer LoggerAndTracer,
148 : readerOpts sstable.ReaderOptions,
149 : reportCorruptionFn func(any, error) error,
150 2 : ) *fileCacheHandle {
151 2 : c.Ref()
152 2 :
153 2 : t := &fileCacheHandle{
154 2 : fileCache: c,
155 2 : loggerAndTracer: loggerAndTracer,
156 2 : blockCacheHandle: cacheHandle,
157 2 : objProvider: objProvider,
158 2 : }
159 2 : t.readerOpts = readerOpts
160 2 : t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
161 2 : t.reportCorruptionFn = reportCorruptionFn
162 2 : if invariants.RaceEnabled {
163 0 : t.raceMu.openRefs = make(map[uint64][]byte)
164 0 : }
165 2 : return t
166 : }
167 :
168 : // Close the handle, make sure that there will be no further need
169 : // to access any of the files associated with the store.
170 2 : func (h *fileCacheHandle) Close() error {
171 2 : // We want to do some cleanup work here. Check for leaked iterators
172 2 : // by the DB using this container. Note that we'll still perform cleanup
173 2 : // below in the case that there are leaked iterators.
174 2 : var err error
175 2 : if v := h.iterCount.Load(); v > 0 {
176 1 : if !invariants.RaceEnabled {
177 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
178 1 : } else {
179 0 : var buf bytes.Buffer
180 0 : for _, stack := range h.raceMu.openRefs {
181 0 : fmt.Fprintf(&buf, "%s\n", stack)
182 0 : }
183 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
184 : }
185 : }
186 :
187 : // EvictAll would panic if there are still outstanding references.
188 2 : if err == nil {
189 2 : keys := h.fileCache.c.EvictAll(func(key fileCacheKey) bool {
190 2 : return key.handle == h
191 2 : })
192 : // Evict any associated blocks in the cache.
193 2 : for i := range keys {
194 2 : h.blockCacheHandle.EvictFile(keys[i].fileNum)
195 2 : }
196 : }
197 :
198 2 : h.fileCache.Unref()
199 2 : // TODO(radu): we have to tolerate metrics() calls after close (see
200 2 : // https://github.com/cockroachdb/cockroach/issues/140454).
201 2 : // *h = fileCacheHandle{}
202 2 : return err
203 : }
204 :
205 : // openFile is called when we insert a new entry in the file cache.
206 : func (h *fileCacheHandle) openFile(
207 : ctx context.Context, fileNum base.DiskFileNum, fileType base.FileType,
208 2 : ) (io.Closer, objstorage.ObjectMetadata, error) {
209 2 : f, err := h.objProvider.OpenForReading(
210 2 : ctx, fileType, fileNum, objstorage.OpenOptions{MustExist: true},
211 2 : )
212 2 : if err != nil {
213 1 : return nil, objstorage.ObjectMetadata{}, err
214 1 : }
215 2 : objMeta, err := h.objProvider.Lookup(fileType, fileNum)
216 2 : if err != nil {
217 0 : return nil, objstorage.ObjectMetadata{}, err
218 0 : }
219 :
220 2 : o := h.readerOpts
221 2 : o.CacheOpts = sstableinternal.CacheOptions{
222 2 : CacheHandle: h.blockCacheHandle,
223 2 : FileNum: fileNum,
224 2 : }
225 2 : switch fileType {
226 2 : case base.FileTypeTable:
227 2 : r, err := sstable.NewReader(ctx, f, o)
228 2 : if err != nil {
229 1 : // If opening the sstable reader fails, we're responsible for
230 1 : // closing the objstorage.Readable.
231 1 : return nil, objMeta, errors.CombineErrors(err, f.Close())
232 1 : }
233 2 : return r, objMeta, nil
234 2 : case base.FileTypeBlob:
235 2 : r, err := blob.NewFileReader(ctx, f, blob.FileReaderOptions{
236 2 : ReaderOptions: o.ReaderOptions,
237 2 : })
238 2 : if err != nil {
239 0 : // If opening the blob file reader fails, we're responsible for
240 0 : // closing the objstorage.Readable.
241 0 : return nil, objMeta, errors.CombineErrors(err, f.Close())
242 0 : }
243 2 : return r, objMeta, nil
244 0 : default:
245 0 : panic(errors.AssertionFailedf("pebble: unexpected file cache file type: %s", fileType))
246 : }
247 : }
248 :
249 : // findOrCreateTable retrieves an existing sstable reader or creates a new one
250 : // for the backing file of the given table. If a corruption error is
251 : // encountered, reportCorruptionFn() is called.
252 : func (h *fileCacheHandle) findOrCreateTable(
253 : ctx context.Context, meta *manifest.TableMetadata,
254 2 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
255 2 : key := fileCacheKey{
256 2 : handle: h,
257 2 : fileNum: meta.TableBacking.DiskFileNum,
258 2 : fileType: base.FileTypeTable,
259 2 : }
260 2 : valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
261 2 : if err != nil && IsCorruptionError(err) {
262 1 : err = h.reportCorruptionFn(meta, err)
263 1 : }
264 2 : return valRef, err
265 : }
266 :
267 : // findOrCreateBlob retrieves an existing blob reader or creates a new one for
268 : // the given blob file. If a corruption error is encountered,
269 : // reportCorruptionFn() is called.
270 : func (h *fileCacheHandle) findOrCreateBlob(
271 : ctx context.Context, fileNum base.DiskFileNum,
272 2 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
273 2 : key := fileCacheKey{
274 2 : handle: h,
275 2 : fileNum: fileNum,
276 2 : fileType: base.FileTypeBlob,
277 2 : }
278 2 : valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
279 2 : // TODO(jackson): Propagate a blob metadata object here.
280 2 : if err != nil && IsCorruptionError(err) {
281 0 : err = h.reportCorruptionFn(nil, err)
282 0 : }
283 2 : return valRef, err
284 : }
285 :
286 : // Evict the given file from the file cache and the block cache.
287 2 : func (h *fileCacheHandle) Evict(fileNum base.DiskFileNum, fileType base.FileType) {
288 2 : defer func() {
289 2 : if p := recover(); p != nil {
290 0 : panic(fmt.Sprintf("pebble: evicting in-use file %s(%s): %v", fileNum, fileType, p))
291 : }
292 : }()
293 2 : h.fileCache.c.Evict(fileCacheKey{handle: h, fileNum: fileNum, fileType: fileType})
294 2 : h.blockCacheHandle.EvictFile(fileNum)
295 : }
296 :
297 2 : func (h *fileCacheHandle) SSTStatsCollector() *block.CategoryStatsCollector {
298 2 : return &h.sstStatsCollector
299 2 : }
300 :
301 : // Metrics returns metrics for the file cache. Note that the CacheMetrics track
302 : // the global cache which is shared between multiple handles (stores). The
303 : // FilterMetrics are per-handle.
304 2 : func (h *fileCacheHandle) Metrics() (FileCacheMetrics, FilterMetrics) {
305 2 : m := h.fileCache.c.Metrics()
306 2 :
307 2 : // The generic cache maintains a count of entries, but it doesn't know which
308 2 : // entries are sstables and which are blob files, which affects the memory
309 2 : // footprint of the table cache. So the FileCache maintains its own counts,
310 2 : // incremented when initializing a new value and decremented by the
311 2 : // releasing func.
312 2 : countSSTables := h.fileCache.counts.sstables.Load()
313 2 : countBlobFiles := h.fileCache.counts.blobFiles.Load()
314 2 :
315 2 : cm := FileCacheMetrics{
316 2 : TableCount: countSSTables,
317 2 : BlobFileCount: countBlobFiles,
318 2 : Hits: m.Hits,
319 2 : Misses: m.Misses,
320 2 : Size: m.Size + countSSTables*int64(unsafe.Sizeof(sstable.Reader{})) +
321 2 : countBlobFiles*int64(unsafe.Sizeof(blob.FileReader{})),
322 2 : }
323 2 : fm := h.readerOpts.FilterMetricsTracker.Load()
324 2 : return cm, fm
325 2 : }
326 :
327 : func (h *fileCacheHandle) estimateSize(
328 : meta *manifest.TableMetadata, lower, upper []byte,
329 2 : ) (size uint64, err error) {
330 2 : err = h.withReader(context.TODO(), block.NoReadEnv, meta, func(r *sstable.Reader, env sstable.ReadEnv) error {
331 2 : size, err = r.EstimateDiskUsage(lower, upper, env, meta.IterTransforms())
332 2 : return err
333 2 : })
334 2 : return size, err
335 : }
336 :
337 : func createReader(
338 : v *fileCacheValue, meta *manifest.TableMetadata,
339 2 : ) (*sstable.Reader, sstable.ReadEnv) {
340 2 : r := v.mustSSTableReader()
341 2 : env := sstable.ReadEnv{}
342 2 : if meta.Virtual {
343 2 : if invariants.Enabled {
344 2 : if meta.VirtualParams.FileNum == 0 || meta.VirtualParams.Lower.UserKey == nil || meta.VirtualParams.Upper.UserKey == nil {
345 0 : panic("virtual params not initialized")
346 : }
347 : }
348 2 : env.Virtual = meta.VirtualParams
349 2 : env.IsSharedIngested = v.isShared && meta.SyntheticSeqNum() != 0
350 : }
351 2 : return r, env
352 : }
353 :
354 : func (h *fileCacheHandle) withReader(
355 : ctx context.Context,
356 : blockEnv block.ReadEnv,
357 : meta *manifest.TableMetadata,
358 : fn func(*sstable.Reader, sstable.ReadEnv) error,
359 2 : ) error {
360 2 : ref, err := h.findOrCreateTable(ctx, meta)
361 2 : if err != nil {
362 1 : return err
363 1 : }
364 2 : defer ref.Unref()
365 2 : v := ref.Value()
366 2 : blockEnv.ReportCorruptionFn = h.reportCorruptionFn
367 2 : blockEnv.ReportCorruptionArg = meta
368 2 : env := sstable.ReadEnv{Block: blockEnv}
369 2 :
370 2 : r := v.mustSSTableReader()
371 2 : if meta.Virtual {
372 2 : if invariants.Enabled {
373 2 : if meta.VirtualParams.FileNum == 0 || meta.VirtualParams.Lower.UserKey == nil || meta.VirtualParams.Upper.UserKey == nil {
374 0 : panic("virtual params not initialized")
375 : }
376 : }
377 2 : env.Virtual = meta.VirtualParams
378 2 : env.IsSharedIngested = v.isShared && meta.SyntheticSeqNum() != 0
379 : }
380 :
381 2 : return fn(r, env)
382 :
383 : }
384 :
385 2 : func (h *fileCacheHandle) IterCount() int64 {
386 2 : return int64(h.iterCount.Load())
387 2 : }
388 :
389 : // GetValueReader returns a blob.ValueReader for blob file identified by fileNum.
390 : func (h *fileCacheHandle) GetValueReader(
391 : ctx context.Context, fileNum base.DiskFileNum,
392 2 : ) (r blob.ValueReader, closeFunc func(), err error) {
393 2 : ref, err := h.findOrCreateBlob(ctx, fileNum)
394 2 : if err != nil {
395 1 : return nil, nil, err
396 1 : }
397 2 : v := ref.Value()
398 2 : r = v.mustBlob()
399 2 : // NB: The call to findOrCreateBlob incremented the value's reference count.
400 2 : // The closeHook (v.closeHook) takes responsibility for unreferencing the
401 2 : // value. Take care to avoid introducing an allocation here by adding a
402 2 : // closure.
403 2 : closeHook := h.addReference(v)
404 2 : return r, closeHook, nil
405 : }
406 :
407 : // FileCache is a shareable cache for open files. Open files are exclusively
408 : // sstable files today.
409 : type FileCache struct {
410 : refs atomic.Int64
411 : counts struct {
412 : sstables atomic.Int64
413 : blobFiles atomic.Int64
414 : }
415 :
416 : c genericcache.Cache[fileCacheKey, fileCacheValue]
417 : }
418 :
419 : // Ref adds a reference to the file cache. Once a file cache is constructed, the
420 : // cache only remains valid if there is at least one reference to it.
421 2 : func (c *FileCache) Ref() {
422 2 : v := c.refs.Add(1)
423 2 : // We don't want the reference count to ever go from 0 -> 1,
424 2 : // cause a reference count of 0 implies that we've closed the cache.
425 2 : if v <= 1 {
426 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
427 : }
428 : }
429 :
430 : // Unref removes a reference to the file cache.
431 2 : func (c *FileCache) Unref() {
432 2 : v := c.refs.Add(-1)
433 2 : switch {
434 1 : case v < 0:
435 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
436 2 : case v == 0:
437 2 : c.c.Close()
438 2 : c.c = genericcache.Cache[fileCacheKey, fileCacheValue]{}
439 : }
440 : }
441 :
442 : // NewFileCache will create a new file cache with one outstanding reference. It
443 : // is the callers responsibility to call Unref if they will no longer hold a
444 : // reference to the file cache.
445 2 : func NewFileCache(numShards int, size int) *FileCache {
446 2 : if size == 0 {
447 0 : panic("pebble: cannot create a file cache of size 0")
448 2 : } else if numShards == 0 {
449 0 : panic("pebble: cannot create a file cache with 0 shards")
450 : }
451 :
452 2 : c := &FileCache{}
453 2 :
454 2 : // initFn is used whenever a new entry is added to the file cache.
455 2 : initFn := func(ctx context.Context, key fileCacheKey, vRef genericcache.ValueRef[fileCacheKey, fileCacheValue]) error {
456 2 : v := vRef.Value()
457 2 : handle := key.handle
458 2 : v.readerProvider.init(c, key)
459 2 : v.closeHook = func() {
460 2 : // closeHook is called when an iterator is closed; the initialization of
461 2 : // an iterator with this value will happen after a FindOrCreate() call
462 2 : // with returns the same vRef.
463 2 : vRef.Unref()
464 2 : handle.iterCount.Add(-1)
465 2 : }
466 2 : reader, objMeta, err := handle.openFile(ctx, key.fileNum, key.fileType)
467 2 : if err != nil {
468 1 : return errors.Wrapf(err, "pebble: backing file %s error", redact.Safe(key.fileNum))
469 1 : }
470 2 : v.reader = reader
471 2 : v.isShared = objMeta.IsShared()
472 2 : switch key.fileType {
473 2 : case base.FileTypeTable:
474 2 : c.counts.sstables.Add(1)
475 2 : case base.FileTypeBlob:
476 2 : c.counts.blobFiles.Add(1)
477 0 : default:
478 0 : panic("unexpected file type")
479 : }
480 2 : return nil
481 : }
482 :
483 2 : releaseFn := func(v *fileCacheValue) {
484 2 : if v.reader != nil {
485 2 : switch v.reader.(type) {
486 2 : case *sstable.Reader:
487 2 : c.counts.sstables.Add(-1)
488 2 : case *blob.FileReader:
489 2 : c.counts.blobFiles.Add(-1)
490 : }
491 2 : _ = v.reader.Close()
492 2 : v.reader = nil
493 : }
494 : }
495 :
496 2 : c.c.Init(size, numShards, initFn, releaseFn)
497 2 :
498 2 : // Hold a ref to the cache here.
499 2 : c.refs.Store(1)
500 2 :
501 2 : return c
502 : }
503 :
504 : type fileCacheKey struct {
505 : handle *fileCacheHandle
506 : fileNum base.DiskFileNum
507 : // fileType describes the type of file being cached (blob or sstable). A
508 : // file number alone uniquely identifies every file within a DB, but we need
509 : // to propagate the type so the file cache looks for the correct file in
510 : // object storage / the filesystem.
511 : fileType base.FileType
512 : }
513 :
514 : // Shard implements the genericcache.Key interface.
515 2 : func (k fileCacheKey) Shard(numShards int) int {
516 2 : // TODO(radu): maybe incorporate a handle ID.
517 2 : return int(uint64(k.fileNum) % uint64(numShards))
518 2 : }
519 :
520 : // checkAndIntersectFilters checks the specific table and block property filters
521 : // for intersection with any available table and block-level properties. Returns
522 : // true for ok if this table should be read by this iterator.
523 : func checkAndIntersectFilters(
524 : r *sstable.Reader,
525 : blockPropertyFilters []BlockPropertyFilter,
526 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
527 : syntheticSuffix sstable.SyntheticSuffix,
528 2 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
529 2 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
530 2 : filterer, err = sstable.IntersectsTable(
531 2 : blockPropertyFilters,
532 2 : boundLimitedFilter,
533 2 : r.UserProperties,
534 2 : syntheticSuffix,
535 2 : )
536 2 : // NB: IntersectsTable will return a nil filterer if the table-level
537 2 : // properties indicate there's no intersection with the provided filters.
538 2 : if filterer == nil || err != nil {
539 2 : return false, nil, err
540 2 : }
541 : }
542 2 : return true, filterer, nil
543 : }
544 :
545 : func (h *fileCacheHandle) newIters(
546 : ctx context.Context,
547 : file *manifest.TableMetadata,
548 : opts *IterOptions,
549 : internalOpts internalIterOpts,
550 : kinds iterKinds,
551 2 : ) (iterSet, error) {
552 2 : // Calling findOrCreate gives us the responsibility of Unref()ing vRef.
553 2 : vRef, err := h.findOrCreateTable(ctx, file)
554 2 : if err != nil {
555 1 : return iterSet{}, err
556 1 : }
557 :
558 2 : internalOpts.readEnv.Block.ReportCorruptionFn = h.reportCorruptionFn
559 2 : internalOpts.readEnv.Block.ReportCorruptionArg = file
560 2 :
561 2 : v := vRef.Value()
562 2 : r, env := createReader(v, file)
563 2 : internalOpts.readEnv.Virtual = env.Virtual
564 2 : internalOpts.readEnv.IsSharedIngested = env.IsSharedIngested
565 2 :
566 2 : var iters iterSet
567 2 : if kinds.RangeKey() && file.HasRangeKeys {
568 2 : iters.rangeKey, err = newRangeKeyIter(ctx, file, r, opts.SpanIterOptions(), internalOpts)
569 2 : }
570 2 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
571 2 : iters.rangeDeletion, err = newRangeDelIter(ctx, file, r, h, internalOpts)
572 2 : }
573 2 : if kinds.Point() && err == nil {
574 2 : iters.point, err = h.newPointIter(ctx, v, file, r, opts, internalOpts, h)
575 2 : }
576 2 : if err != nil {
577 1 : // NB: There's a subtlety here: Because the point iterator is the last
578 1 : // iterator we attempt to create, it's not possible for:
579 1 : // err != nil && iters.point != nil
580 1 : // If it were possible, we'd need to account for it to avoid double
581 1 : // unref-ing here, once during CloseAll and once during `unrefValue`.
582 1 : _ = iters.CloseAll()
583 1 : vRef.Unref()
584 1 : return iterSet{}, err
585 1 : }
586 : // Only point iterators ever require the reader stay pinned in the cache. If
587 : // we're not returning a point iterator to the caller, we need to unref v.
588 : //
589 : // For point iterators, v.closeHook will be called which will release the ref.
590 2 : if iters.point == nil {
591 2 : vRef.Unref()
592 2 : }
593 2 : return iters, nil
594 : }
595 :
596 : // For flushable ingests, we decide whether to use the bloom filter base on
597 : // size.
598 : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
599 :
600 : // newPointIter is an internal helper that constructs a point iterator over a
601 : // sstable. This function is for internal use only, and callers should use
602 : // newIters instead.
603 : func (h *fileCacheHandle) newPointIter(
604 : ctx context.Context,
605 : v *fileCacheValue,
606 : file *manifest.TableMetadata,
607 : reader *sstable.Reader,
608 : opts *IterOptions,
609 : internalOpts internalIterOpts,
610 : handle *fileCacheHandle,
611 2 : ) (internalIterator, error) {
612 2 : var (
613 2 : hideObsoletePoints bool = false
614 2 : pointKeyFilters []BlockPropertyFilter
615 2 : filterer *sstable.BlockPropertiesFilterer
616 2 : )
617 2 : r := v.mustSSTableReader()
618 2 : if opts != nil {
619 2 : // This code is appending (at most one filter) in-place to
620 2 : // opts.PointKeyFilters even though the slice is shared for iterators in
621 2 : // the same iterator tree. This is acceptable since all the following
622 2 : // properties are true:
623 2 : // - The iterator tree is single threaded, so the shared backing for the
624 2 : // slice is being mutated in a single threaded manner.
625 2 : // - Each shallow copy of the slice has its own notion of length.
626 2 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
627 2 : // struct, which is stateless, so overwriting that struct when creating
628 2 : // one sstable iterator is harmless to other sstable iterators that are
629 2 : // relying on that struct.
630 2 : //
631 2 : // An alternative would be to have different slices for different sstable
632 2 : // iterators, but that requires more work to avoid allocations.
633 2 : //
634 2 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
635 2 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
636 2 : // apply the obsolete block property filter. We could optimize this by
637 2 : // applying the filter.
638 2 : hideObsoletePoints, pointKeyFilters =
639 2 : r.TryAddBlockPropertyFilterForHideObsoletePoints(
640 2 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
641 2 :
642 2 : var ok bool
643 2 : var err error
644 2 : ok, filterer, err = checkAndIntersectFilters(r, pointKeyFilters,
645 2 : internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
646 2 : if err != nil {
647 0 : return nil, err
648 2 : } else if !ok {
649 2 : // No point keys within the table match the filters.
650 2 : return nil, nil
651 2 : }
652 : }
653 :
654 2 : var iter sstable.Iterator
655 2 : filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
656 2 : if opts != nil {
657 2 : // By default, we don't use block filters for L6 and restrict the size for
658 2 : // flushable ingests, as these blocks can be very big.
659 2 : if !opts.UseL6Filters {
660 2 : if opts.layer == manifest.Level(6) {
661 2 : filterBlockSizeLimit = sstable.NeverUseFilterBlock
662 2 : } else if opts.layer.IsFlushableIngests() {
663 2 : filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
664 2 : }
665 : }
666 2 : if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
667 2 : ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
668 2 : }
669 : }
670 :
671 2 : if v.isShared && file.SyntheticSeqNum() != 0 {
672 2 : // The table is shared and ingested.
673 2 : hideObsoletePoints = true
674 2 : }
675 2 : transforms := file.IterTransforms()
676 2 : transforms.HideObsoletePoints = hideObsoletePoints
677 2 : if internalOpts.readEnv.Block.IterStats == nil && opts != nil {
678 2 : internalOpts.readEnv.Block.IterStats = handle.SSTStatsCollector().Accumulator(uint64(uintptr(unsafe.Pointer(r))), opts.Category)
679 2 : }
680 2 : var blobReferences sstable.BlobReferences
681 2 : if r.Attributes.Has(sstable.AttributeBlobValues) {
682 2 : if len(file.BlobReferences) == 0 {
683 0 : return nil, errors.AssertionFailedf("pebble: sstable %s has blob values but no blob references", file.TableNum)
684 0 : }
685 2 : blobReferences = &file.BlobReferences
686 : }
687 2 : var err error
688 2 : if internalOpts.compaction {
689 2 : iter, err = reader.NewCompactionIter(transforms, internalOpts.readEnv,
690 2 : &v.readerProvider, sstable.TableBlobContext{
691 2 : ValueFetcher: internalOpts.blobValueFetcher,
692 2 : References: blobReferences,
693 2 : })
694 2 : } else {
695 2 : iter, err = reader.NewPointIter(ctx, sstable.IterOptions{
696 2 : Lower: opts.GetLowerBound(),
697 2 : Upper: opts.GetUpperBound(),
698 2 : Transforms: transforms,
699 2 : FilterBlockSizeLimit: filterBlockSizeLimit,
700 2 : Filterer: filterer,
701 2 : Env: internalOpts.readEnv,
702 2 : ReaderProvider: &v.readerProvider,
703 2 : BlobContext: sstable.TableBlobContext{
704 2 : ValueFetcher: internalOpts.blobValueFetcher,
705 2 : References: blobReferences,
706 2 : },
707 2 : })
708 2 : }
709 2 : if err != nil {
710 1 : return nil, err
711 1 : }
712 : // NB: closeHook (v.closeHook) takes responsibility for calling
713 : // unrefValue(v) here. Take care to avoid introducing an allocation here by
714 : // adding a closure.
715 2 : closeHook := h.addReference(v)
716 2 : iter.SetCloseHook(closeHook)
717 2 : return iter, nil
718 : }
719 :
720 2 : func (h *fileCacheHandle) addReference(v *fileCacheValue) (closeHook func()) {
721 2 : h.iterCount.Add(1)
722 2 : closeHook = v.closeHook
723 2 : if invariants.RaceEnabled {
724 0 : stack := debug.Stack()
725 0 : h.raceMu.Lock()
726 0 : refID := h.raceMu.nextRefID
727 0 : h.raceMu.openRefs[refID] = stack
728 0 : h.raceMu.nextRefID++
729 0 : h.raceMu.Unlock()
730 0 : // In race builds, this closeHook closure will force an allocation.
731 0 : // Race builds are already unperformant (and allocate a stack trace), so
732 0 : // we don't care.
733 0 : closeHook = func() {
734 0 : v.closeHook()
735 0 : h.raceMu.Lock()
736 0 : defer h.raceMu.Unlock()
737 0 : delete(h.raceMu.openRefs, refID)
738 0 : }
739 : }
740 2 : return closeHook
741 : }
742 :
743 : // SetupBlobReaderProvider creates a fileCachHandle blob.ReaderProvider for
744 : // reading blob files. The caller is responsible for calling the returned cleanup
745 : // function.
746 : //
747 : // NB: This function is intended for testing and tooling purposes only. It
748 : // provides blob file access outside of normal database operations and is not
749 : // used by databases opened through Open().
750 : func SetupBlobReaderProvider(
751 : fs vfs.FS, path string, opts *Options, readOpts sstable.ReaderOptions,
752 1 : ) (blob.ReaderProvider, func(), error) {
753 1 : var fc *FileCache
754 1 : var c *cache.Cache
755 1 : var ch *cache.Handle
756 1 : var objProvider objstorage.Provider
757 1 : var provider *fileCacheHandle
758 1 :
759 1 : // Helper to clean up resources in case of error.
760 1 : cleanup := func() {
761 1 : if provider != nil {
762 1 : _ = provider.Close()
763 1 : }
764 1 : if objProvider != nil {
765 1 : _ = objProvider.Close()
766 1 : }
767 1 : if ch != nil {
768 1 : ch.Close()
769 1 : }
770 1 : if c != nil {
771 1 : c.Unref()
772 1 : }
773 1 : if fc != nil {
774 1 : fc.Unref()
775 1 : }
776 : }
777 :
778 1 : fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
779 1 : if opts.FileCache == nil {
780 1 : fc = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
781 1 : } else {
782 0 : fc = opts.FileCache
783 0 : fc.Ref()
784 0 : }
785 :
786 1 : if opts.Cache == nil {
787 1 : c = cache.New(opts.CacheSize)
788 1 : } else {
789 0 : c = opts.Cache
790 0 : c.Ref()
791 0 : }
792 1 : ch = c.NewHandle()
793 1 :
794 1 : var err error
795 1 : objProvider, err = objstorageprovider.Open(objstorageprovider.DefaultSettings(fs, path))
796 1 : if err != nil {
797 0 : cleanup()
798 0 : return nil, nil, err
799 0 : }
800 :
801 1 : provider = fc.newHandle(
802 1 : ch,
803 1 : objProvider,
804 1 : opts.LoggerAndTracer,
805 1 : readOpts,
806 1 : func(any, error) error { return nil },
807 : )
808 :
809 1 : return provider, cleanup, nil
810 : }
811 :
812 : // newRangeDelIter is an internal helper that constructs an iterator over a
813 : // sstable's range deletions. This function is for table-cache internal use
814 : // only, and callers should use newIters instead.
815 : func newRangeDelIter(
816 : ctx context.Context,
817 : file *manifest.TableMetadata,
818 : r *sstable.Reader,
819 : handle *fileCacheHandle,
820 : internalOpts internalIterOpts,
821 2 : ) (keyspan.FragmentIterator, error) {
822 2 : // NB: range-del iterator does not maintain a reference to the table, nor
823 2 : // does it need to read from it after creation.
824 2 : rangeDelIter, err := r.NewRawRangeDelIter(ctx, file.FragmentIterTransforms(), internalOpts.readEnv)
825 2 : if err != nil {
826 1 : return nil, err
827 1 : }
828 : // Assert expected bounds in tests.
829 2 : if invariants.Sometimes(50) && rangeDelIter != nil {
830 2 : cmp := base.DefaultComparer.Compare
831 2 : if handle.readerOpts.Comparer != nil {
832 2 : cmp = handle.readerOpts.Comparer.Compare
833 2 : }
834 2 : rangeDelIter = keyspan.AssertBounds(
835 2 : rangeDelIter, file.PointKeyBounds.Smallest(), file.PointKeyBounds.LargestUserKey(), cmp,
836 2 : )
837 : }
838 2 : return rangeDelIter, nil
839 : }
840 :
841 : // newRangeKeyIter is an internal helper that constructs an iterator over a
842 : // sstable's range keys. This function is for table-cache internal use only, and
843 : // callers should use newIters instead.
844 : func newRangeKeyIter(
845 : ctx context.Context,
846 : file *manifest.TableMetadata,
847 : r *sstable.Reader,
848 : opts keyspan.SpanIterOptions,
849 : internalOpts internalIterOpts,
850 2 : ) (keyspan.FragmentIterator, error) {
851 2 : transforms := file.FragmentIterTransforms()
852 2 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
853 2 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
854 2 : // file's range key blocks may surface deleted range keys below. This is
855 2 : // done here, rather than deferring to the block-property collector in order
856 2 : // to maintain parity with point keys and the treatment of RANGEDELs.
857 2 : if !r.Attributes.Has(sstable.AttributeRangeKeyDels) && len(opts.RangeKeyFilters) > 0 {
858 0 : ok, _, err := checkAndIntersectFilters(r, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
859 0 : if err != nil {
860 0 : return nil, err
861 0 : } else if !ok {
862 0 : return nil, nil
863 0 : }
864 : }
865 : // TODO(radu): wrap in an AssertBounds.
866 2 : return r.NewRawRangeKeyIter(ctx, transforms, internalOpts.readEnv)
867 : }
868 :
869 : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
870 : // specific table.
871 : type tableCacheShardReaderProvider struct {
872 : c *genericcache.Cache[fileCacheKey, fileCacheValue]
873 : key fileCacheKey
874 :
875 : mu struct {
876 : sync.Mutex
877 : // r is the result of c.FindOrCreate(), only set iff refCount > 0.
878 : r genericcache.ValueRef[fileCacheKey, fileCacheValue]
879 : // refCount is the number of GetReader() calls that have not received a
880 : // corresponding Close().
881 : refCount int
882 : }
883 : }
884 :
885 : var _ valblk.ReaderProvider = &tableCacheShardReaderProvider{}
886 :
887 2 : func (rp *tableCacheShardReaderProvider) init(fc *FileCache, key fileCacheKey) {
888 2 : rp.c = &fc.c
889 2 : rp.key = key
890 2 : rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
891 2 : rp.mu.refCount = 0
892 2 : }
893 :
894 : // GetReader implements sstable.ReaderProvider. Note that it is not the
895 : // responsibility of tableCacheShardReaderProvider to ensure that the file
896 : // continues to exist. The ReaderProvider is used in iterators where the
897 : // top-level iterator is pinning the read state and preventing the files from
898 : // being deleted.
899 : //
900 : // The caller must call tableCacheShardReaderProvider.Close.
901 : //
902 : // Note that currently the Reader returned here is only used to read value
903 : // blocks. This reader shouldn't be used for other purposes like reading keys
904 : // outside of virtual sstable bounds.
905 : //
906 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
907 : // that the reader isn't used for other purposes.
908 : func (rp *tableCacheShardReaderProvider) GetReader(
909 : ctx context.Context,
910 2 : ) (valblk.ExternalBlockReader, error) {
911 2 : rp.mu.Lock()
912 2 : defer rp.mu.Unlock()
913 2 :
914 2 : if rp.mu.refCount > 0 {
915 0 : // We already have a value.
916 0 : rp.mu.refCount++
917 0 : return rp.mu.r.Value().mustSSTableReader(), nil
918 0 : }
919 :
920 : // Calling FindOrCreate gives us the responsibility of Unref()ing r, which
921 : // will happen when rp.mu.refCount reaches 0. Note that if the table is no
922 : // longer in the cache, FindOrCreate will need to do IO (through initFn in
923 : // NewFileCache) to initialize a new Reader. We hold rp.mu during this time so
924 : // that concurrent GetReader calls block until the Reader is created.
925 2 : r, err := rp.c.FindOrCreate(ctx, rp.key)
926 2 : if err != nil {
927 0 : return nil, err
928 0 : }
929 2 : rp.mu.r = r
930 2 : rp.mu.refCount = 1
931 2 : return r.Value().mustSSTableReader(), nil
932 : }
933 :
934 : // Close implements sstable.ReaderProvider.
935 2 : func (rp *tableCacheShardReaderProvider) Close() {
936 2 : rp.mu.Lock()
937 2 : defer rp.mu.Unlock()
938 2 : rp.mu.refCount--
939 2 : if rp.mu.refCount <= 0 {
940 2 : if rp.mu.refCount < 0 {
941 0 : panic("pebble: sstable.ReaderProvider misuse")
942 : }
943 2 : rp.mu.r.Unref()
944 2 : rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
945 : }
946 : }
947 :
948 : // getTableProperties returns sst table properties for the backing file.
949 : //
950 : // WARNING! If file is a virtual table, we return the properties of the physical
951 : // table.
952 : func (h *fileCacheHandle) getTableProperties(
953 : file *manifest.TableMetadata,
954 1 : ) (*sstable.Properties, error) {
955 1 : // Calling findOrCreateTable gives us the responsibility of decrementing v's
956 1 : // refCount here
957 1 : v, err := h.findOrCreateTable(context.TODO(), file)
958 1 : if err != nil {
959 0 : return nil, err
960 0 : }
961 1 : defer v.Unref()
962 1 :
963 1 : r := v.Value().mustSSTableReader()
964 1 : props, err := r.ReadPropertiesBlock(context.TODO(), nil /* buffer pool */)
965 1 : if err != nil {
966 0 : return nil, err
967 0 : }
968 1 : return &props, nil
969 : }
970 :
971 : type fileCacheValue struct {
972 : closeHook func()
973 : reader io.Closer // *sstable.Reader or *blob.FileReader
974 : isShared bool
975 :
976 : // readerProvider is embedded here so that we only allocate it once as long as
977 : // the table stays in the cache. Its state is not always logically tied to
978 : // this specific fileCacheShard - if a table goes out of the cache and then
979 : // comes back in, the readerProvider in a now-defunct fileCacheValue can
980 : // still be used and will internally refer to the new fileCacheValue.
981 : readerProvider tableCacheShardReaderProvider
982 : }
983 :
984 : // mustSSTable retrieves the value's *sstable.Reader. It panics if the cached
985 : // file is not a sstable (i.e., it is a blob file).
986 2 : func (v *fileCacheValue) mustSSTableReader() *sstable.Reader {
987 2 : return v.reader.(*sstable.Reader)
988 2 : }
989 :
990 : // mustBlob retrieves the value's *blob.FileReader. It panics if the cached file
991 : // is not a blob file.
992 2 : func (v *fileCacheValue) mustBlob() *blob.FileReader {
993 2 : return v.reader.(*blob.FileReader)
994 2 : }
995 :
996 : // iterSet holds a set of iterators of various key kinds, all constructed over
997 : // the same data structure (eg, an sstable). A subset of the fields may be
998 : // populated depending on the `iterKinds` passed to newIters.
999 : type iterSet struct {
1000 : point internalIterator
1001 : rangeDeletion keyspan.FragmentIterator
1002 : rangeKey keyspan.FragmentIterator
1003 : }
1004 :
1005 : // TODO(jackson): Consider adding methods for fast paths that check whether an
1006 : // iterator of a particular kind is nil, so that these call sites don't need to
1007 : // reach into the struct's fields directly.
1008 :
1009 : // Point returns the contained point iterator. If there is no point iterator,
1010 : // Point returns a non-nil empty point iterator.
1011 2 : func (s *iterSet) Point() internalIterator {
1012 2 : if s.point == nil {
1013 2 : return emptyIter
1014 2 : }
1015 2 : return s.point
1016 : }
1017 :
1018 : // RangeDeletion returns the contained range deletion iterator. If there is no
1019 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
1020 : // iterator.
1021 2 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
1022 2 : if s.rangeDeletion == nil {
1023 2 : return emptyKeyspanIter
1024 2 : }
1025 2 : return s.rangeDeletion
1026 : }
1027 :
1028 : // RangeKey returns the contained range key iterator. If there is no range key
1029 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
1030 2 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
1031 2 : if s.rangeKey == nil {
1032 0 : return emptyKeyspanIter
1033 0 : }
1034 2 : return s.rangeKey
1035 : }
1036 :
1037 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
1038 : // must be not be called on the constituent iterators.
1039 2 : func (s *iterSet) CloseAll() error {
1040 2 : var err error
1041 2 : if s.point != nil {
1042 2 : err = s.point.Close()
1043 2 : s.point = nil
1044 2 : }
1045 2 : if s.rangeDeletion != nil {
1046 2 : s.rangeDeletion.Close()
1047 2 : s.rangeDeletion = nil
1048 2 : }
1049 2 : if s.rangeKey != nil {
1050 2 : s.rangeKey.Close()
1051 2 : s.rangeKey = nil
1052 2 : }
1053 2 : return err
1054 : }
1055 :
1056 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
1057 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
1058 : // represent a set of desired iterator kinds.
1059 : type iterKinds uint8
1060 :
1061 2 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
1062 2 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
1063 2 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
1064 :
1065 : const (
1066 : iterPointKeys iterKinds = 1 << iota
1067 : iterRangeDeletions
1068 : iterRangeKeys
1069 : )
|