Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "sync"
14 : "sync/atomic"
15 : "unsafe"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/cache"
20 : "github.com/cockroachdb/pebble/internal/genericcache"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/sstableinternal"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
28 : "github.com/cockroachdb/pebble/sstable"
29 : "github.com/cockroachdb/pebble/sstable/blob"
30 : "github.com/cockroachdb/pebble/sstable/block"
31 : "github.com/cockroachdb/pebble/sstable/valblk"
32 : "github.com/cockroachdb/redact"
33 : )
34 :
35 : var emptyIter = &errorIter{err: nil}
36 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
37 :
38 : // tableNewIters creates new iterators (point, range deletion and/or range key)
39 : // for the given table metadata. Which of the various iterator kinds the user is
40 : // requesting is specified with the iterKinds bitmap.
41 : //
42 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
43 : // populated with iterators.
44 : //
45 : // If a point iterator is requested and the operation was successful,
46 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
47 : // finished.
48 : //
49 : // If a range deletion or range key iterator is requested, the corresponding
50 : // iterator may be nil if the table does not contain any keys of the
51 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
52 : // RangeKey() convenience methods that return non-nil empty iterators that may
53 : // be used if the caller requires a non-nil iterator.
54 : //
55 : // On error, all iterators are nil.
56 : //
57 : // The only (non-test) implementation of tableNewIters is
58 : // fileCacheHandle.newIters().
59 : type tableNewIters func(
60 : ctx context.Context,
61 : file *manifest.TableMetadata,
62 : opts *IterOptions,
63 : internalOpts internalIterOpts,
64 : kinds iterKinds,
65 : ) (iterSet, error)
66 :
67 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
68 : // for the rangedel iterator returned by tableNewIters.
69 1 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
70 1 : return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
71 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
72 1 : if err != nil {
73 0 : return nil, err
74 0 : }
75 1 : return iters.RangeDeletion(), nil
76 : }
77 : }
78 :
79 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
80 : // for the range key iterator returned by tableNewIters.
81 1 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
82 1 : return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
83 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
84 1 : if err != nil {
85 1 : return nil, err
86 1 : }
87 1 : return iters.RangeKey(), nil
88 : }
89 : }
90 :
91 : // fileCacheHandle is used to access the file cache. Each DB has its own handle.
92 : type fileCacheHandle struct {
93 : fileCache *FileCache
94 :
95 : // The handle contains fields which are unique to each DB. Note that these get
96 : // accessed from all shards, so keep read-only fields separate for read-write
97 : // fields.
98 : loggerAndTracer LoggerAndTracer
99 : blockCacheHandle *cache.Handle
100 : objProvider objstorage.Provider
101 : readerOpts sstable.ReaderOptions
102 :
103 : // iterCount keeps track of how many iterators are open. It is used to keep
104 : // track of leaked iterators on a per-db level.
105 : iterCount atomic.Int32
106 : sstStatsCollector block.CategoryStatsCollector
107 :
108 : // reportCorruptionFn is used for block.ReadEnv.ReportCorruptionFn. It expects
109 : // the first argument to be a `*TableMetadata`.
110 : reportCorruptionFn func(any, error)
111 :
112 : // This struct is only populated in race builds.
113 : raceMu struct {
114 : sync.Mutex
115 : // nextRefID is the next ID to allocate for a new reference.
116 : nextRefID uint64
117 : // openRefs maps reference IDs to the stack trace recorded at creation
118 : // time. It's used to track which call paths leaked open references to
119 : // files.
120 : openRefs map[uint64][]byte
121 : }
122 : }
123 :
124 : // Assert that *fileCacheHandle implements blob.ReaderProvider.
125 : var _ blob.ReaderProvider = (*fileCacheHandle)(nil)
126 :
127 : // newHandle creates a handle for the FileCache which has its own options. Each
128 : // handle has its own set of files in the cache, separate from those of other
129 : // handles.
130 : func (c *FileCache) newHandle(
131 : cacheHandle *cache.Handle,
132 : objProvider objstorage.Provider,
133 : loggerAndTracer LoggerAndTracer,
134 : readerOpts sstable.ReaderOptions,
135 : reportCorruptionFn func(any, error),
136 1 : ) *fileCacheHandle {
137 1 : c.Ref()
138 1 :
139 1 : t := &fileCacheHandle{
140 1 : fileCache: c,
141 1 : loggerAndTracer: loggerAndTracer,
142 1 : blockCacheHandle: cacheHandle,
143 1 : objProvider: objProvider,
144 1 : }
145 1 : t.readerOpts = readerOpts
146 1 : t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
147 1 : t.reportCorruptionFn = reportCorruptionFn
148 1 : if invariants.RaceEnabled {
149 0 : t.raceMu.openRefs = make(map[uint64][]byte)
150 0 : }
151 1 : return t
152 : }
153 :
154 : // Close the handle, make sure that there will be no further need
155 : // to access any of the files associated with the store.
156 1 : func (h *fileCacheHandle) Close() error {
157 1 : // We want to do some cleanup work here. Check for leaked iterators
158 1 : // by the DB using this container. Note that we'll still perform cleanup
159 1 : // below in the case that there are leaked iterators.
160 1 : var err error
161 1 : if v := h.iterCount.Load(); v > 0 {
162 1 : if !invariants.RaceEnabled {
163 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
164 1 : } else {
165 0 : var buf bytes.Buffer
166 0 : for _, stack := range h.raceMu.openRefs {
167 0 : fmt.Fprintf(&buf, "%s\n", stack)
168 0 : }
169 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
170 : }
171 : }
172 :
173 : // EvictAll would panic if there are still outstanding references.
174 1 : if err == nil {
175 1 : keys := h.fileCache.c.EvictAll(func(key fileCacheKey) bool {
176 1 : return key.handle == h
177 1 : })
178 : // Evict any associated blocks in the cache.
179 1 : for i := range keys {
180 1 : h.blockCacheHandle.EvictFile(keys[i].fileNum)
181 1 : }
182 : }
183 :
184 1 : h.fileCache.Unref()
185 1 : // TODO(radu): we have to tolerate metrics() calls after close (see
186 1 : // https://github.com/cockroachdb/cockroach/issues/140454).
187 1 : // *h = fileCacheHandle{}
188 1 : return err
189 : }
190 :
191 : // openFile is called when we insert a new entry in the file cache.
192 : func (h *fileCacheHandle) openFile(
193 : ctx context.Context, fileNum base.DiskFileNum, fileType base.FileType,
194 1 : ) (io.Closer, objstorage.ObjectMetadata, error) {
195 1 : f, err := h.objProvider.OpenForReading(
196 1 : ctx, fileType, fileNum, objstorage.OpenOptions{MustExist: true},
197 1 : )
198 1 : if err != nil {
199 1 : return nil, objstorage.ObjectMetadata{}, err
200 1 : }
201 1 : objMeta, err := h.objProvider.Lookup(fileType, fileNum)
202 1 : if err != nil {
203 0 : return nil, objstorage.ObjectMetadata{}, err
204 0 : }
205 :
206 1 : o := h.readerOpts
207 1 : o.CacheOpts = sstableinternal.CacheOptions{
208 1 : CacheHandle: h.blockCacheHandle,
209 1 : FileNum: fileNum,
210 1 : }
211 1 : switch fileType {
212 1 : case base.FileTypeTable:
213 1 : r, err := sstable.NewReader(ctx, f, o)
214 1 : if err != nil {
215 1 : return nil, objMeta, err
216 1 : }
217 1 : return r, objMeta, nil
218 1 : case base.FileTypeBlob:
219 1 : r, err := blob.NewFileReader(ctx, f, blob.FileReaderOptions{
220 1 : ReaderOptions: o.ReaderOptions,
221 1 : })
222 1 : if err != nil {
223 0 : return nil, objMeta, err
224 0 : }
225 1 : return r, objMeta, nil
226 0 : default:
227 0 : panic(errors.AssertionFailedf("pebble: unexpected file cache file type: %s", fileType))
228 : }
229 : }
230 :
231 : // findOrCreateTable retrieves an existing sstable reader or creates a new one
232 : // for the backing file of the given table. If a corruption error is
233 : // encountered, reportCorruptionFn() is called.
234 : func (h *fileCacheHandle) findOrCreateTable(
235 : ctx context.Context, meta *manifest.TableMetadata,
236 1 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
237 1 : key := fileCacheKey{
238 1 : handle: h,
239 1 : fileNum: meta.FileBacking.DiskFileNum,
240 1 : fileType: base.FileTypeTable,
241 1 : }
242 1 : valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
243 1 : if err != nil && IsCorruptionError(err) {
244 1 : h.reportCorruptionFn(meta, err)
245 1 : }
246 1 : return valRef, err
247 : }
248 :
249 : // findOrCreateBlob retrieves an existing blob reader or creates a new one for
250 : // the given blob file. If a corruption error is encountered,
251 : // reportCorruptionFn() is called.
252 : func (h *fileCacheHandle) findOrCreateBlob(
253 : ctx context.Context, fileNum base.DiskFileNum,
254 1 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
255 1 : key := fileCacheKey{
256 1 : handle: h,
257 1 : fileNum: fileNum,
258 1 : fileType: base.FileTypeBlob,
259 1 : }
260 1 : valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
261 1 : // TODO(jackson): Propagate a blob metadata object here.
262 1 : if err != nil && IsCorruptionError(err) {
263 0 : h.reportCorruptionFn(nil, err)
264 0 : }
265 1 : return valRef, err
266 : }
267 :
268 : // Evict the given file from the file cache and the block cache.
269 1 : func (h *fileCacheHandle) Evict(fileNum base.DiskFileNum, fileType base.FileType) {
270 1 : h.fileCache.c.Evict(fileCacheKey{handle: h, fileNum: fileNum, fileType: fileType})
271 1 : h.blockCacheHandle.EvictFile(fileNum)
272 1 : }
273 :
274 1 : func (h *fileCacheHandle) SSTStatsCollector() *block.CategoryStatsCollector {
275 1 : return &h.sstStatsCollector
276 1 : }
277 :
278 : // Metrics returns metrics for the file cache. Note that the CacheMetrics track
279 : // the global cache which is shared between multiple handles (stores). The
280 : // FilterMetrics are per-handle.
281 1 : func (h *fileCacheHandle) Metrics() (CacheMetrics, FilterMetrics) {
282 1 : m := h.fileCache.c.Metrics()
283 1 :
284 1 : // The generic cache maintains a count of entries, but it doesn't know which
285 1 : // entries are sstables and which are blob files, which affects the memory
286 1 : // footprint of the table cache. So the FileCache maintains its own counts,
287 1 : // incremented when initializing a new value and decremented by the
288 1 : // releasing func.
289 1 : countSSTables := h.fileCache.counts.sstables.Load()
290 1 : countBlobFiles := h.fileCache.counts.blobFiles.Load()
291 1 :
292 1 : cm := CacheMetrics{
293 1 : Hits: m.Hits,
294 1 : Misses: m.Misses,
295 1 : Count: countSSTables + countBlobFiles,
296 1 : Size: m.Size + countSSTables*int64(unsafe.Sizeof(sstable.Reader{})) +
297 1 : countBlobFiles*int64(unsafe.Sizeof(blob.FileReader{})),
298 1 : }
299 1 : fm := h.readerOpts.FilterMetricsTracker.Load()
300 1 : return cm, fm
301 1 : }
302 :
303 : func (h *fileCacheHandle) estimateSize(
304 : meta *tableMetadata, lower, upper []byte,
305 1 : ) (size uint64, err error) {
306 1 : err = h.withCommonReader(context.TODO(), block.NoReadEnv, meta, func(cr sstable.CommonReader, env block.ReadEnv) error {
307 1 : size, err = cr.EstimateDiskUsage(lower, upper)
308 1 : return err
309 1 : })
310 1 : return size, err
311 : }
312 :
313 : // createCommonReader creates a Reader for this file.
314 1 : func createCommonReader(v *fileCacheValue, file *tableMetadata) sstable.CommonReader {
315 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
316 1 : r := v.mustSSTableReader()
317 1 : var cr sstable.CommonReader = r
318 1 : if file.Virtual {
319 1 : virtualReader := sstable.MakeVirtualReader(
320 1 : r, file.VirtualMeta().VirtualReaderParams(v.isShared),
321 1 : )
322 1 : cr = &virtualReader
323 1 : }
324 1 : return cr
325 : }
326 :
327 : func (h *fileCacheHandle) withCommonReader(
328 : ctx context.Context,
329 : env block.ReadEnv,
330 : meta *tableMetadata,
331 : fn func(sstable.CommonReader, block.ReadEnv) error,
332 1 : ) error {
333 1 : ref, err := h.findOrCreateTable(ctx, meta)
334 1 : if err != nil {
335 0 : return err
336 0 : }
337 1 : defer ref.Unref()
338 1 : env.ReportCorruptionFn = h.reportCorruptionFn
339 1 : env.ReportCorruptionArg = meta
340 1 : return fn(createCommonReader(ref.Value(), meta), env)
341 : }
342 :
343 : func (h *fileCacheHandle) withReader(
344 : ctx context.Context,
345 : env block.ReadEnv,
346 : meta physicalMeta,
347 : fn func(*sstable.Reader, block.ReadEnv) error,
348 1 : ) error {
349 1 : ref, err := h.findOrCreateTable(ctx, meta.TableMetadata)
350 1 : if err != nil {
351 1 : return err
352 1 : }
353 1 : defer ref.Unref()
354 1 : env.ReportCorruptionFn = h.reportCorruptionFn
355 1 : env.ReportCorruptionArg = meta.TableMetadata
356 1 : return fn(ref.Value().reader.(*sstable.Reader), env)
357 : }
358 :
359 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
360 : func (h *fileCacheHandle) withVirtualReader(
361 : ctx context.Context,
362 : env block.ReadEnv,
363 : meta virtualMeta,
364 : fn func(sstable.VirtualReader, block.ReadEnv) error,
365 1 : ) error {
366 1 : ref, err := h.findOrCreateTable(ctx, meta.TableMetadata)
367 1 : if err != nil {
368 0 : return err
369 0 : }
370 1 : defer ref.Unref()
371 1 : v := ref.Value()
372 1 : env.ReportCorruptionFn = h.reportCorruptionFn
373 1 : env.ReportCorruptionArg = &meta.TableMetadata
374 1 : return fn(sstable.MakeVirtualReader(v.mustSSTableReader(), meta.VirtualReaderParams(v.isShared)), env)
375 : }
376 :
377 1 : func (h *fileCacheHandle) IterCount() int64 {
378 1 : return int64(h.iterCount.Load())
379 1 : }
380 :
381 : // GetValueReader returns a blob.ValueReader for blob file identified by fileNum.
382 : func (h *fileCacheHandle) GetValueReader(
383 : ctx context.Context, fileNum base.DiskFileNum,
384 1 : ) (r blob.ValueReader, closeFunc func(), err error) {
385 1 : ref, err := h.findOrCreateBlob(ctx, fileNum)
386 1 : if err != nil {
387 0 : return nil, nil, err
388 0 : }
389 1 : v := ref.Value()
390 1 : r = v.mustBlob()
391 1 : // NB: The call to findOrCreateBlob incremented the value's reference count.
392 1 : // The closeHook (v.closeHook) takes responsibility for unreferencing the
393 1 : // value. Take care to avoid introducing an allocation here by adding a
394 1 : // closure.
395 1 : closeHook := h.addReference(v)
396 1 : return r, closeHook, nil
397 : }
398 :
399 : // FileCache is a shareable cache for open files. Open files are exclusively
400 : // sstable files today.
401 : type FileCache struct {
402 : refs atomic.Int64
403 : counts struct {
404 : sstables atomic.Int64
405 : blobFiles atomic.Int64
406 : }
407 :
408 : c genericcache.Cache[fileCacheKey, fileCacheValue]
409 : }
410 :
411 : // Ref adds a reference to the file cache. Once a file cache is constructed, the
412 : // cache only remains valid if there is at least one reference to it.
413 1 : func (c *FileCache) Ref() {
414 1 : v := c.refs.Add(1)
415 1 : // We don't want the reference count to ever go from 0 -> 1,
416 1 : // cause a reference count of 0 implies that we've closed the cache.
417 1 : if v <= 1 {
418 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
419 : }
420 : }
421 :
422 : // Unref removes a reference to the file cache.
423 1 : func (c *FileCache) Unref() {
424 1 : v := c.refs.Add(-1)
425 1 : switch {
426 1 : case v < 0:
427 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
428 1 : case v == 0:
429 1 : c.c.Close()
430 1 : c.c = genericcache.Cache[fileCacheKey, fileCacheValue]{}
431 : }
432 : }
433 :
434 : // NewFileCache will create a new file cache with one outstanding reference. It
435 : // is the callers responsibility to call Unref if they will no longer hold a
436 : // reference to the file cache.
437 1 : func NewFileCache(numShards int, size int) *FileCache {
438 1 : if size == 0 {
439 0 : panic("pebble: cannot create a file cache of size 0")
440 1 : } else if numShards == 0 {
441 0 : panic("pebble: cannot create a file cache with 0 shards")
442 : }
443 :
444 1 : c := &FileCache{}
445 1 :
446 1 : // initFn is used whenever a new entry is added to the file cache.
447 1 : initFn := func(ctx context.Context, key fileCacheKey, vRef genericcache.ValueRef[fileCacheKey, fileCacheValue]) error {
448 1 : v := vRef.Value()
449 1 : handle := key.handle
450 1 : v.readerProvider.init(c, key)
451 1 : v.closeHook = func() {
452 1 : // closeHook is called when an iterator is closed; the initialization of
453 1 : // an iterator with this value will happen after a FindOrCreate() call
454 1 : // with returns the same vRef.
455 1 : vRef.Unref()
456 1 : handle.iterCount.Add(-1)
457 1 : }
458 1 : reader, objMeta, err := handle.openFile(ctx, key.fileNum, key.fileType)
459 1 : if err != nil {
460 1 : return errors.Wrapf(err, "pebble: backing file %s error", redact.Safe(key.fileNum))
461 1 : }
462 1 : v.reader = reader
463 1 : v.isShared = objMeta.IsShared()
464 1 : switch key.fileType {
465 1 : case base.FileTypeTable:
466 1 : c.counts.sstables.Add(1)
467 1 : case base.FileTypeBlob:
468 1 : c.counts.blobFiles.Add(1)
469 0 : default:
470 0 : panic("unexpected file type")
471 : }
472 1 : return nil
473 : }
474 :
475 1 : releaseFn := func(v *fileCacheValue) {
476 1 : if v.reader != nil {
477 1 : switch v.reader.(type) {
478 1 : case *sstable.Reader:
479 1 : c.counts.sstables.Add(-1)
480 1 : case *blob.FileReader:
481 1 : c.counts.blobFiles.Add(-1)
482 : }
483 1 : _ = v.reader.Close()
484 1 : v.reader = nil
485 : }
486 : }
487 :
488 1 : c.c.Init(size, numShards, initFn, releaseFn)
489 1 :
490 1 : // Hold a ref to the cache here.
491 1 : c.refs.Store(1)
492 1 :
493 1 : return c
494 : }
495 :
496 : type fileCacheKey struct {
497 : handle *fileCacheHandle
498 : fileNum base.DiskFileNum
499 : // fileType describes the type of file being cached (blob or sstable). A
500 : // file number alone uniquely identifies every file within a DB, but we need
501 : // to propagate the type so the file cache looks for the correct file in
502 : // object storage / the filesystem.
503 : fileType base.FileType
504 : }
505 :
506 : // Shard implements the genericcache.Key interface.
507 1 : func (k fileCacheKey) Shard(numShards int) int {
508 1 : // TODO(radu): maybe incorporate a handle ID.
509 1 : return int(uint64(k.fileNum) % uint64(numShards))
510 1 : }
511 :
512 : // checkAndIntersectFilters checks the specific table and block property filters
513 : // for intersection with any available table and block-level properties. Returns
514 : // true for ok if this table should be read by this iterator.
515 : func checkAndIntersectFilters(
516 : r *sstable.Reader,
517 : blockPropertyFilters []BlockPropertyFilter,
518 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
519 : syntheticSuffix sstable.SyntheticSuffix,
520 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
521 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
522 1 : filterer, err = sstable.IntersectsTable(
523 1 : blockPropertyFilters,
524 1 : boundLimitedFilter,
525 1 : r.Properties.UserProperties,
526 1 : syntheticSuffix,
527 1 : )
528 1 : // NB: IntersectsTable will return a nil filterer if the table-level
529 1 : // properties indicate there's no intersection with the provided filters.
530 1 : if filterer == nil || err != nil {
531 1 : return false, nil, err
532 1 : }
533 : }
534 1 : return true, filterer, nil
535 : }
536 :
537 : func (h *fileCacheHandle) newIters(
538 : ctx context.Context,
539 : file *manifest.TableMetadata,
540 : opts *IterOptions,
541 : internalOpts internalIterOpts,
542 : kinds iterKinds,
543 1 : ) (iterSet, error) {
544 1 : // Calling findOrCreate gives us the responsibility of Unref()ing vRef.
545 1 : vRef, err := h.findOrCreateTable(ctx, file)
546 1 : if err != nil {
547 1 : return iterSet{}, err
548 1 : }
549 :
550 1 : internalOpts.readEnv.ReportCorruptionFn = h.reportCorruptionFn
551 1 : internalOpts.readEnv.ReportCorruptionArg = file
552 1 :
553 1 : v := vRef.Value()
554 1 : r := v.mustSSTableReader()
555 1 : // Note: This suffers an allocation for virtual sstables.
556 1 : cr := createCommonReader(v, file)
557 1 : var iters iterSet
558 1 : if kinds.RangeKey() && file.HasRangeKeys {
559 1 : iters.rangeKey, err = newRangeKeyIter(ctx, r, file, cr, opts.SpanIterOptions(), internalOpts)
560 1 : }
561 1 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
562 1 : iters.rangeDeletion, err = newRangeDelIter(ctx, file, cr, h, internalOpts)
563 1 : }
564 1 : if kinds.Point() && err == nil {
565 1 : iters.point, err = h.newPointIter(ctx, v, file, cr, opts, internalOpts, h)
566 1 : }
567 1 : if err != nil {
568 1 : // NB: There's a subtlety here: Because the point iterator is the last
569 1 : // iterator we attempt to create, it's not possible for:
570 1 : // err != nil && iters.point != nil
571 1 : // If it were possible, we'd need to account for it to avoid double
572 1 : // unref-ing here, once during CloseAll and once during `unrefValue`.
573 1 : iters.CloseAll()
574 1 : vRef.Unref()
575 1 : return iterSet{}, err
576 1 : }
577 : // Only point iterators ever require the reader stay pinned in the cache. If
578 : // we're not returning a point iterator to the caller, we need to unref v.
579 : //
580 : // For point iterators, v.closeHook will be called which will release the ref.
581 1 : if iters.point == nil {
582 1 : vRef.Unref()
583 1 : }
584 1 : return iters, nil
585 : }
586 :
587 : // For flushable ingests, we decide whether to use the bloom filter base on
588 : // size.
589 : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
590 :
591 : // newPointIter is an internal helper that constructs a point iterator over a
592 : // sstable. This function is for internal use only, and callers should use
593 : // newIters instead.
594 : func (h *fileCacheHandle) newPointIter(
595 : ctx context.Context,
596 : v *fileCacheValue,
597 : file *manifest.TableMetadata,
598 : cr sstable.CommonReader,
599 : opts *IterOptions,
600 : internalOpts internalIterOpts,
601 : handle *fileCacheHandle,
602 1 : ) (internalIterator, error) {
603 1 : var (
604 1 : hideObsoletePoints bool = false
605 1 : pointKeyFilters []BlockPropertyFilter
606 1 : filterer *sstable.BlockPropertiesFilterer
607 1 : )
608 1 : r := v.mustSSTableReader()
609 1 : if opts != nil {
610 1 : // This code is appending (at most one filter) in-place to
611 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
612 1 : // the same iterator tree. This is acceptable since all the following
613 1 : // properties are true:
614 1 : // - The iterator tree is single threaded, so the shared backing for the
615 1 : // slice is being mutated in a single threaded manner.
616 1 : // - Each shallow copy of the slice has its own notion of length.
617 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
618 1 : // struct, which is stateless, so overwriting that struct when creating
619 1 : // one sstable iterator is harmless to other sstable iterators that are
620 1 : // relying on that struct.
621 1 : //
622 1 : // An alternative would be to have different slices for different sstable
623 1 : // iterators, but that requires more work to avoid allocations.
624 1 : //
625 1 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
626 1 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
627 1 : // apply the obsolete block property filter. We could optimize this by
628 1 : // applying the filter.
629 1 : hideObsoletePoints, pointKeyFilters =
630 1 : r.TryAddBlockPropertyFilterForHideObsoletePoints(
631 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
632 1 :
633 1 : var ok bool
634 1 : var err error
635 1 : ok, filterer, err = checkAndIntersectFilters(r, pointKeyFilters,
636 1 : internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
637 1 : if err != nil {
638 0 : return nil, err
639 1 : } else if !ok {
640 1 : // No point keys within the table match the filters.
641 1 : return nil, nil
642 1 : }
643 : }
644 :
645 1 : var iter sstable.Iterator
646 1 : filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
647 1 : if opts != nil {
648 1 : // By default, we don't use block filters for L6 and restrict the size for
649 1 : // flushable ingests, as these blocks can be very big.
650 1 : if !opts.UseL6Filters {
651 1 : if opts.layer == manifest.Level(6) {
652 1 : filterBlockSizeLimit = sstable.NeverUseFilterBlock
653 1 : } else if opts.layer.IsFlushableIngests() {
654 1 : filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
655 1 : }
656 : }
657 1 : if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
658 1 : ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
659 1 : }
660 : }
661 1 : tableFormat, err := r.TableFormat()
662 1 : if err != nil {
663 0 : return nil, err
664 0 : }
665 :
666 1 : if v.isShared && file.SyntheticSeqNum() != 0 {
667 1 : if tableFormat < sstable.TableFormatPebblev4 {
668 0 : return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
669 0 : }
670 : // The table is shared and ingested.
671 1 : hideObsoletePoints = true
672 : }
673 1 : transforms := file.IterTransforms()
674 1 : transforms.HideObsoletePoints = hideObsoletePoints
675 1 : if internalOpts.readEnv.IterStats == nil && opts != nil {
676 1 : internalOpts.readEnv.IterStats = handle.SSTStatsCollector().Accumulator(uint64(uintptr(unsafe.Pointer(r))), opts.Category)
677 1 : }
678 1 : if internalOpts.compaction {
679 1 : iter, err = cr.NewCompactionIter(transforms, internalOpts.readEnv, &v.readerProvider)
680 1 : } else {
681 1 : iter, err = cr.NewPointIter(
682 1 : ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer,
683 1 : filterBlockSizeLimit, internalOpts.readEnv, &v.readerProvider)
684 1 : }
685 1 : if err != nil {
686 1 : return nil, err
687 1 : }
688 : // NB: closeHook (v.closeHook) takes responsibility for calling
689 : // unrefValue(v) here. Take care to avoid introducing an allocation here by
690 : // adding a closure.
691 1 : closeHook := h.addReference(v)
692 1 : iter.SetCloseHook(closeHook)
693 1 : return iter, nil
694 : }
695 :
696 1 : func (h *fileCacheHandle) addReference(v *fileCacheValue) (closeHook func()) {
697 1 : h.iterCount.Add(1)
698 1 : closeHook = v.closeHook
699 1 : if invariants.RaceEnabled {
700 0 : stack := debug.Stack()
701 0 : h.raceMu.Lock()
702 0 : refID := h.raceMu.nextRefID
703 0 : h.raceMu.openRefs[refID] = stack
704 0 : h.raceMu.nextRefID++
705 0 : h.raceMu.Unlock()
706 0 : // In race builds, this closeHook closure will force an allocation.
707 0 : // Race builds are already unperformant (and allocate a stack trace), so
708 0 : // we don't care.
709 0 : closeHook = func() {
710 0 : v.closeHook()
711 0 : h.raceMu.Lock()
712 0 : defer h.raceMu.Unlock()
713 0 : delete(h.raceMu.openRefs, refID)
714 0 : }
715 : }
716 1 : return closeHook
717 : }
718 :
719 : // newRangeDelIter is an internal helper that constructs an iterator over a
720 : // sstable's range deletions. This function is for table-cache internal use
721 : // only, and callers should use newIters instead.
722 : func newRangeDelIter(
723 : ctx context.Context,
724 : file *manifest.TableMetadata,
725 : cr sstable.CommonReader,
726 : handle *fileCacheHandle,
727 : internalOpts internalIterOpts,
728 1 : ) (keyspan.FragmentIterator, error) {
729 1 : // NB: range-del iterator does not maintain a reference to the table, nor
730 1 : // does it need to read from it after creation.
731 1 : rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms(), internalOpts.readEnv)
732 1 : if err != nil {
733 1 : return nil, err
734 1 : }
735 : // Assert expected bounds in tests.
736 1 : if invariants.Sometimes(50) && rangeDelIter != nil {
737 1 : cmp := base.DefaultComparer.Compare
738 1 : if handle.readerOpts.Comparer != nil {
739 1 : cmp = handle.readerOpts.Comparer.Compare
740 1 : }
741 1 : rangeDelIter = keyspan.AssertBounds(
742 1 : rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
743 1 : )
744 : }
745 1 : return rangeDelIter, nil
746 : }
747 :
748 : // newRangeKeyIter is an internal helper that constructs an iterator over a
749 : // sstable's range keys. This function is for table-cache internal use only, and
750 : // callers should use newIters instead.
751 : func newRangeKeyIter(
752 : ctx context.Context,
753 : r *sstable.Reader,
754 : file *tableMetadata,
755 : cr sstable.CommonReader,
756 : opts keyspan.SpanIterOptions,
757 : internalOpts internalIterOpts,
758 1 : ) (keyspan.FragmentIterator, error) {
759 1 : transforms := file.FragmentIterTransforms()
760 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
761 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
762 1 : // file's range key blocks may surface deleted range keys below. This is
763 1 : // done here, rather than deferring to the block-property collector in order
764 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
765 1 : if r.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
766 0 : ok, _, err := checkAndIntersectFilters(r, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
767 0 : if err != nil {
768 0 : return nil, err
769 0 : } else if !ok {
770 0 : return nil, nil
771 0 : }
772 : }
773 : // TODO(radu): wrap in an AssertBounds.
774 1 : return cr.NewRawRangeKeyIter(ctx, transforms, internalOpts.readEnv)
775 : }
776 :
777 : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
778 : // specific table.
779 : type tableCacheShardReaderProvider struct {
780 : c *genericcache.Cache[fileCacheKey, fileCacheValue]
781 : key fileCacheKey
782 :
783 : mu struct {
784 : sync.Mutex
785 : // r is the result of c.FindOrCreate(), only set iff refCount > 0.
786 : r genericcache.ValueRef[fileCacheKey, fileCacheValue]
787 : // refCount is the number of GetReader() calls that have not received a
788 : // corresponding Close().
789 : refCount int
790 : }
791 : }
792 :
793 : var _ valblk.ReaderProvider = &tableCacheShardReaderProvider{}
794 :
795 1 : func (rp *tableCacheShardReaderProvider) init(fc *FileCache, key fileCacheKey) {
796 1 : rp.c = &fc.c
797 1 : rp.key = key
798 1 : rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
799 1 : rp.mu.refCount = 0
800 1 : }
801 :
802 : // GetReader implements sstable.ReaderProvider. Note that it is not the
803 : // responsibility of tableCacheShardReaderProvider to ensure that the file
804 : // continues to exist. The ReaderProvider is used in iterators where the
805 : // top-level iterator is pinning the read state and preventing the files from
806 : // being deleted.
807 : //
808 : // The caller must call tableCacheShardReaderProvider.Close.
809 : //
810 : // Note that currently the Reader returned here is only used to read value
811 : // blocks. This reader shouldn't be used for other purposes like reading keys
812 : // outside of virtual sstable bounds.
813 : //
814 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
815 : // that the reader isn't used for other purposes.
816 : func (rp *tableCacheShardReaderProvider) GetReader(
817 : ctx context.Context,
818 1 : ) (valblk.ExternalBlockReader, error) {
819 1 : rp.mu.Lock()
820 1 : defer rp.mu.Unlock()
821 1 :
822 1 : if rp.mu.refCount > 0 {
823 0 : // We already have a value.
824 0 : rp.mu.refCount++
825 0 : return rp.mu.r.Value().mustSSTableReader(), nil
826 0 : }
827 :
828 : // Calling FindOrCreate gives us the responsibility of Unref()ing r, which
829 : // will happen when rp.mu.refCount reaches 0. Note that if the table is no
830 : // longer in the cache, FindOrCreate will need to do IO (through initFn in
831 : // NewFileCache) to initialize a new Reader. We hold rp.mu during this time so
832 : // that concurrent GetReader calls block until the Reader is created.
833 1 : r, err := rp.c.FindOrCreate(ctx, rp.key)
834 1 : if err != nil {
835 0 : return nil, err
836 0 : }
837 1 : rp.mu.r = r
838 1 : rp.mu.refCount = 1
839 1 : return r.Value().mustSSTableReader(), nil
840 : }
841 :
842 : // Close implements sstable.ReaderProvider.
843 1 : func (rp *tableCacheShardReaderProvider) Close() {
844 1 : rp.mu.Lock()
845 1 : defer rp.mu.Unlock()
846 1 : rp.mu.refCount--
847 1 : if rp.mu.refCount <= 0 {
848 1 : if rp.mu.refCount < 0 {
849 0 : panic("pebble: sstable.ReaderProvider misuse")
850 : }
851 1 : rp.mu.r.Unref()
852 1 : rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
853 : }
854 : }
855 :
856 : // getTableProperties returns sst table properties for the backing file.
857 : //
858 : // WARNING! If file is a virtual table, we return the properties of the physical
859 : // table.
860 1 : func (h *fileCacheHandle) getTableProperties(file *tableMetadata) (*sstable.Properties, error) {
861 1 : // Calling findOrCreateTable gives us the responsibility of decrementing v's
862 1 : // refCount here
863 1 : v, err := h.findOrCreateTable(context.TODO(), file)
864 1 : if err != nil {
865 0 : return nil, err
866 0 : }
867 1 : defer v.Unref()
868 1 :
869 1 : r := v.Value().mustSSTableReader()
870 1 : return &r.Properties, nil
871 : }
872 :
873 : type fileCacheValue struct {
874 : closeHook func()
875 : reader io.Closer // *sstable.Reader or *blob.FileReader
876 : isShared bool
877 :
878 : // readerProvider is embedded here so that we only allocate it once as long as
879 : // the table stays in the cache. Its state is not always logically tied to
880 : // this specific fileCacheShard - if a table goes out of the cache and then
881 : // comes back in, the readerProvider in a now-defunct fileCacheValue can
882 : // still be used and will internally refer to the new fileCacheValue.
883 : readerProvider tableCacheShardReaderProvider
884 : }
885 :
886 : // mustSSTable retrieves the value's *sstable.Reader. It panics if the cached
887 : // file is not a sstable (i.e., it is a blob file).
888 1 : func (v *fileCacheValue) mustSSTableReader() *sstable.Reader {
889 1 : return v.reader.(*sstable.Reader)
890 1 : }
891 :
892 : // mustBlob retrieves the value's *blob.FileReader. It panics if the cached file
893 : // is not a blob file.
894 1 : func (v *fileCacheValue) mustBlob() *blob.FileReader {
895 1 : return v.reader.(*blob.FileReader)
896 1 : }
897 :
898 : // iterSet holds a set of iterators of various key kinds, all constructed over
899 : // the same data structure (eg, an sstable). A subset of the fields may be
900 : // populated depending on the `iterKinds` passed to newIters.
901 : type iterSet struct {
902 : point internalIterator
903 : rangeDeletion keyspan.FragmentIterator
904 : rangeKey keyspan.FragmentIterator
905 : }
906 :
907 : // TODO(jackson): Consider adding methods for fast paths that check whether an
908 : // iterator of a particular kind is nil, so that these call sites don't need to
909 : // reach into the struct's fields directly.
910 :
911 : // Point returns the contained point iterator. If there is no point iterator,
912 : // Point returns a non-nil empty point iterator.
913 1 : func (s *iterSet) Point() internalIterator {
914 1 : if s.point == nil {
915 1 : return emptyIter
916 1 : }
917 1 : return s.point
918 : }
919 :
920 : // RangeDeletion returns the contained range deletion iterator. If there is no
921 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
922 : // iterator.
923 1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
924 1 : if s.rangeDeletion == nil {
925 1 : return emptyKeyspanIter
926 1 : }
927 1 : return s.rangeDeletion
928 : }
929 :
930 : // RangeKey returns the contained range key iterator. If there is no range key
931 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
932 1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
933 1 : if s.rangeKey == nil {
934 0 : return emptyKeyspanIter
935 0 : }
936 1 : return s.rangeKey
937 : }
938 :
939 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
940 : // must be not be called on the constituent iterators.
941 1 : func (s *iterSet) CloseAll() error {
942 1 : var err error
943 1 : if s.point != nil {
944 1 : err = s.point.Close()
945 1 : s.point = nil
946 1 : }
947 1 : if s.rangeDeletion != nil {
948 1 : s.rangeDeletion.Close()
949 1 : s.rangeDeletion = nil
950 1 : }
951 1 : if s.rangeKey != nil {
952 1 : s.rangeKey.Close()
953 1 : s.rangeKey = nil
954 1 : }
955 1 : return err
956 : }
957 :
958 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
959 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
960 : // represent a set of desired iterator kinds.
961 : type iterKinds uint8
962 :
963 1 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
964 1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
965 1 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
966 :
967 : const (
968 : iterPointKeys iterKinds = 1 << iota
969 : iterRangeDeletions
970 : iterRangeKeys
971 : )
|