Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "sync"
14 : "sync/atomic"
15 : "unsafe"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/cache"
20 : "github.com/cockroachdb/pebble/internal/genericcache"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/sstableinternal"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
28 : "github.com/cockroachdb/pebble/sstable"
29 : "github.com/cockroachdb/pebble/sstable/block"
30 : "github.com/cockroachdb/pebble/sstable/valblk"
31 : "github.com/cockroachdb/redact"
32 : )
33 :
34 : var emptyIter = &errorIter{err: nil}
35 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
36 :
37 : // tableNewIters creates new iterators (point, range deletion and/or range key)
38 : // for the given table metadata. Which of the various iterator kinds the user is
39 : // requesting is specified with the iterKinds bitmap.
40 : //
41 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
42 : // populated with iterators.
43 : //
44 : // If a point iterator is requested and the operation was successful,
45 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
46 : // finished.
47 : //
48 : // If a range deletion or range key iterator is requested, the corresponding
49 : // iterator may be nil if the table does not contain any keys of the
50 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
51 : // RangeKey() convenience methods that return non-nil empty iterators that may
52 : // be used if the caller requires a non-nil iterator.
53 : //
54 : // On error, all iterators are nil.
55 : //
56 : // The only (non-test) implementation of tableNewIters is
57 : // fileCacheHandle.newIters().
58 : type tableNewIters func(
59 : ctx context.Context,
60 : file *manifest.TableMetadata,
61 : opts *IterOptions,
62 : internalOpts internalIterOpts,
63 : kinds iterKinds,
64 : ) (iterSet, error)
65 :
66 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
67 : // for the rangedel iterator returned by tableNewIters.
68 1 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
69 1 : return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
70 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
71 1 : if err != nil {
72 0 : return nil, err
73 0 : }
74 1 : return iters.RangeDeletion(), nil
75 : }
76 : }
77 :
78 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
79 : // for the range key iterator returned by tableNewIters.
80 1 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
81 1 : return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
82 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
83 1 : if err != nil {
84 1 : return nil, err
85 1 : }
86 1 : return iters.RangeKey(), nil
87 : }
88 : }
89 :
90 : // fileCacheHandle is used to access the file cache. Each DB has its own handle.
91 : type fileCacheHandle struct {
92 : fileCache *FileCache
93 :
94 : // The handle contains fields which are unique to each DB. Note that these get
95 : // accessed from all shards, so keep read-only fields separate for read-write
96 : // fields.
97 : loggerAndTracer LoggerAndTracer
98 : blockCacheHandle *cache.Handle
99 : objProvider objstorage.Provider
100 : readerOpts sstable.ReaderOptions
101 :
102 : // iterCount keeps track of how many iterators are open. It is used to keep
103 : // track of leaked iterators on a per-db level.
104 : iterCount atomic.Int32
105 : sstStatsCollector block.CategoryStatsCollector
106 :
107 : // This struct is only populated in race builds.
108 : raceMu struct {
109 : sync.Mutex
110 : // iters maps sstable.Iterator objects to the stack trace recorded at
111 : // creation time.
112 : iters map[any][]byte
113 : }
114 : }
115 :
116 : // newHandle creates a handle for the FileCache which has its own options. Each
117 : // handle has its own set of files in the cache, separate from those of other
118 : // handles.
119 : // TODO(radu): don't pass entire options.
120 : func (c *FileCache) newHandle(
121 : cacheHandle *cache.Handle,
122 : objProvider objstorage.Provider,
123 : loggerAndTracer LoggerAndTracer,
124 : readerOpts sstable.ReaderOptions,
125 1 : ) *fileCacheHandle {
126 1 : c.Ref()
127 1 :
128 1 : t := &fileCacheHandle{
129 1 : fileCache: c,
130 1 : loggerAndTracer: loggerAndTracer,
131 1 : blockCacheHandle: cacheHandle,
132 1 : objProvider: objProvider,
133 1 : }
134 1 : t.readerOpts = readerOpts
135 1 : t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
136 1 : if invariants.RaceEnabled {
137 0 : t.raceMu.iters = make(map[any][]byte)
138 0 : }
139 1 : return t
140 : }
141 :
142 : // Close the handle, make sure that there will be no further need
143 : // to access any of the files associated with the store.
144 1 : func (h *fileCacheHandle) Close() error {
145 1 : // We want to do some cleanup work here. Check for leaked iterators
146 1 : // by the DB using this container. Note that we'll still perform cleanup
147 1 : // below in the case that there are leaked iterators.
148 1 : var err error
149 1 : if v := h.iterCount.Load(); v > 0 {
150 1 : if !invariants.RaceEnabled {
151 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
152 1 : } else {
153 0 : var buf bytes.Buffer
154 0 : for _, stack := range h.raceMu.iters {
155 0 : fmt.Fprintf(&buf, "%s\n", stack)
156 0 : }
157 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
158 : }
159 : }
160 :
161 : // EvictAll would panic if there are still outstanding references.
162 1 : if err == nil {
163 1 : keys := h.fileCache.c.EvictAll(func(key fileCacheKey) bool {
164 1 : return key.handle == h
165 1 : })
166 : // Evict any associated blocks in the cache.
167 1 : for i := range keys {
168 1 : h.blockCacheHandle.EvictFile(keys[i].fileNum)
169 1 : }
170 : }
171 :
172 1 : h.fileCache.Unref()
173 1 : // TODO(radu): we have to tolerate metrics() calls after close (see
174 1 : // https://github.com/cockroachdb/cockroach/issues/140454).
175 1 : // *h = fileCacheHandle{}
176 1 : return err
177 : }
178 :
179 : // openFile is called when we insert a new entry in the file cache.
180 : func (h *fileCacheHandle) openFile(
181 : ctx context.Context, fileNum base.DiskFileNum,
182 1 : ) (*sstable.Reader, objstorage.ObjectMetadata, error) {
183 1 : f, err := h.objProvider.OpenForReading(
184 1 : ctx, base.FileTypeTable, fileNum, objstorage.OpenOptions{MustExist: true},
185 1 : )
186 1 : if err != nil {
187 1 : return nil, objstorage.ObjectMetadata{}, err
188 1 : }
189 1 : o := h.readerOpts
190 1 : o.CacheOpts = sstableinternal.CacheOptions{
191 1 : CacheHandle: h.blockCacheHandle,
192 1 : FileNum: fileNum,
193 1 : }
194 1 : r, err := sstable.NewReader(ctx, f, o)
195 1 : if err != nil {
196 1 : return nil, objstorage.ObjectMetadata{}, err
197 1 : }
198 1 : objMeta, err := h.objProvider.Lookup(base.FileTypeTable, fileNum)
199 1 : if err != nil {
200 0 : r.Close()
201 0 : return nil, objstorage.ObjectMetadata{}, err
202 0 : }
203 1 : return r, objMeta, nil
204 : }
205 :
206 : // findOrCreate is a wrapper for h.fileCache.FindOrCreate.
207 : func (h *fileCacheHandle) findOrCreate(
208 : ctx context.Context, fileNum base.DiskFileNum,
209 1 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
210 1 : key := fileCacheKey{
211 1 : handle: h,
212 1 : fileNum: fileNum,
213 1 : }
214 1 : return h.fileCache.c.FindOrCreate(ctx, key)
215 1 : }
216 :
217 : // Evict the given file from the file cache and the block cache.
218 1 : func (h *fileCacheHandle) Evict(fileNum base.DiskFileNum) {
219 1 : h.fileCache.c.Evict(fileCacheKey{handle: h, fileNum: fileNum})
220 1 : h.blockCacheHandle.EvictFile(fileNum)
221 1 : }
222 :
223 1 : func (h *fileCacheHandle) SSTStatsCollector() *block.CategoryStatsCollector {
224 1 : return &h.sstStatsCollector
225 1 : }
226 :
227 : // Metrics returns metrics for the file cache. Note that the CacheMetrics track
228 : // the global cache which is shared between multiple handles (stores). The
229 : // FilterMetrics are per-handle.
230 1 : func (h *fileCacheHandle) Metrics() (CacheMetrics, FilterMetrics) {
231 1 : m := h.fileCache.c.Metrics()
232 1 : cm := CacheMetrics{
233 1 : Hits: m.Hits,
234 1 : Misses: m.Misses,
235 1 : Count: m.Count,
236 1 : Size: m.Size + m.Count*int64(unsafe.Sizeof(sstable.Reader{})),
237 1 : }
238 1 : fm := h.readerOpts.FilterMetricsTracker.Load()
239 1 : return cm, fm
240 1 : }
241 :
242 : func (h *fileCacheHandle) estimateSize(
243 : meta *tableMetadata, lower, upper []byte,
244 1 : ) (size uint64, err error) {
245 1 : h.withCommonReader(meta, func(cr sstable.CommonReader) error {
246 1 : size, err = cr.EstimateDiskUsage(lower, upper)
247 1 : return err
248 1 : })
249 1 : return size, err
250 : }
251 :
252 : // createCommonReader creates a Reader for this file.
253 1 : func createCommonReader(v *fileCacheValue, file *tableMetadata) sstable.CommonReader {
254 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
255 1 : r := v.mustSSTableReader()
256 1 : var cr sstable.CommonReader = r
257 1 : if file.Virtual {
258 1 : virtualReader := sstable.MakeVirtualReader(
259 1 : r, file.VirtualMeta().VirtualReaderParams(v.isShared),
260 1 : )
261 1 : cr = &virtualReader
262 1 : }
263 1 : return cr
264 : }
265 :
266 : func (h *fileCacheHandle) withCommonReader(
267 : meta *tableMetadata, fn func(sstable.CommonReader) error,
268 1 : ) error {
269 1 : ref, err := h.findOrCreate(context.TODO(), meta.FileBacking.DiskFileNum)
270 1 : if err != nil {
271 0 : return err
272 0 : }
273 1 : defer ref.Unref()
274 1 : return fn(createCommonReader(ref.Value(), meta))
275 : }
276 :
277 1 : func (h *fileCacheHandle) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
278 1 : ref, err := h.findOrCreate(context.TODO(), meta.FileBacking.DiskFileNum)
279 1 : if err != nil {
280 1 : return err
281 1 : }
282 1 : defer ref.Unref()
283 1 : return fn(ref.Value().reader.(*sstable.Reader))
284 : }
285 :
286 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
287 : func (h *fileCacheHandle) withVirtualReader(
288 : meta virtualMeta, fn func(sstable.VirtualReader) error,
289 1 : ) error {
290 1 : ref, err := h.findOrCreate(context.TODO(), meta.FileBacking.DiskFileNum)
291 1 : if err != nil {
292 0 : return err
293 0 : }
294 1 : defer ref.Unref()
295 1 : v := ref.Value()
296 1 : return fn(sstable.MakeVirtualReader(v.mustSSTableReader(), meta.VirtualReaderParams(v.isShared)))
297 : }
298 :
299 1 : func (h *fileCacheHandle) IterCount() int64 {
300 1 : return int64(h.iterCount.Load())
301 1 : }
302 :
303 : // FileCache is a shareable cache for open files. Open files are exclusively
304 : // sstable files today.
305 : type FileCache struct {
306 : refs atomic.Int64
307 :
308 : c genericcache.Cache[fileCacheKey, fileCacheValue]
309 : }
310 :
311 : // Ref adds a reference to the file cache. Once a file cache is constructed, the
312 : // cache only remains valid if there is at least one reference to it.
313 1 : func (c *FileCache) Ref() {
314 1 : v := c.refs.Add(1)
315 1 : // We don't want the reference count to ever go from 0 -> 1,
316 1 : // cause a reference count of 0 implies that we've closed the cache.
317 1 : if v <= 1 {
318 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
319 : }
320 : }
321 :
322 : // Unref removes a reference to the file cache.
323 1 : func (c *FileCache) Unref() {
324 1 : v := c.refs.Add(-1)
325 1 : switch {
326 1 : case v < 0:
327 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
328 1 : case v == 0:
329 1 : c.c.Close()
330 1 : c.c = genericcache.Cache[fileCacheKey, fileCacheValue]{}
331 : }
332 : }
333 :
334 : // NewFileCache will create a new file cache with one outstanding reference. It
335 : // is the callers responsibility to call Unref if they will no longer hold a
336 : // reference to the file cache.
337 1 : func NewFileCache(numShards int, size int) *FileCache {
338 1 : if size == 0 {
339 0 : panic("pebble: cannot create a file cache of size 0")
340 1 : } else if numShards == 0 {
341 0 : panic("pebble: cannot create a file cache with 0 shards")
342 : }
343 :
344 1 : c := &FileCache{}
345 1 :
346 1 : // initFn is used whenever a new entry is added to the file cache.
347 1 : initFn := func(ctx context.Context, key fileCacheKey, vRef genericcache.ValueRef[fileCacheKey, fileCacheValue]) error {
348 1 : v := vRef.Value()
349 1 : handle := key.handle
350 1 : v.readerProvider.init(c, key)
351 1 : v.closeHook = func(iterator any) {
352 1 : if invariants.RaceEnabled {
353 0 : handle.raceMu.Lock()
354 0 : delete(handle.raceMu.iters, iterator)
355 0 : handle.raceMu.Unlock()
356 0 : }
357 : // closeHook is called when an iterator is closed; the initialization of
358 : // an iterator with this value will happen after a FindOrCreate() call
359 : // with returns the same vRef.
360 1 : vRef.Unref()
361 1 : handle.iterCount.Add(-1)
362 : }
363 1 : reader, objMeta, err := handle.openFile(ctx, key.fileNum)
364 1 : if err != nil {
365 1 : return errors.Wrapf(err, "pebble: backing file %s error", redact.Safe(key.fileNum))
366 1 : }
367 1 : v.reader = reader
368 1 : v.isShared = objMeta.IsShared()
369 1 : return nil
370 : }
371 :
372 1 : releaseFn := func(v *fileCacheValue) {
373 1 : if v.reader != nil {
374 1 : _ = v.reader.Close()
375 1 : v.reader = nil
376 1 : }
377 : }
378 :
379 1 : c.c.Init(size, numShards, initFn, releaseFn)
380 1 :
381 1 : // Hold a ref to the cache here.
382 1 : c.refs.Store(1)
383 1 :
384 1 : return c
385 : }
386 :
387 : type fileCacheKey struct {
388 : handle *fileCacheHandle
389 : fileNum base.DiskFileNum
390 : }
391 :
392 : // Shard implements the genericcache.Key interface.
393 1 : func (k fileCacheKey) Shard(numShards int) int {
394 1 : // TODO(radu): maybe incorporate a handle ID.
395 1 : return int(uint64(k.fileNum) % uint64(numShards))
396 1 : }
397 :
398 : // checkAndIntersectFilters checks the specific table and block property filters
399 : // for intersection with any available table and block-level properties. Returns
400 : // true for ok if this table should be read by this iterator.
401 : func checkAndIntersectFilters(
402 : r *sstable.Reader,
403 : blockPropertyFilters []BlockPropertyFilter,
404 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
405 : syntheticSuffix sstable.SyntheticSuffix,
406 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
407 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
408 1 : filterer, err = sstable.IntersectsTable(
409 1 : blockPropertyFilters,
410 1 : boundLimitedFilter,
411 1 : r.Properties.UserProperties,
412 1 : syntheticSuffix,
413 1 : )
414 1 : // NB: IntersectsTable will return a nil filterer if the table-level
415 1 : // properties indicate there's no intersection with the provided filters.
416 1 : if filterer == nil || err != nil {
417 1 : return false, nil, err
418 1 : }
419 : }
420 1 : return true, filterer, nil
421 : }
422 :
423 : func (h *fileCacheHandle) newIters(
424 : ctx context.Context,
425 : file *manifest.TableMetadata,
426 : opts *IterOptions,
427 : internalOpts internalIterOpts,
428 : kinds iterKinds,
429 1 : ) (iterSet, error) {
430 1 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
431 1 : // since parts of the sstable are read during the construction. The Reader
432 1 : // should not remember that context since the Reader can be long-lived.
433 1 :
434 1 : // Calling findOrCreate gives us the responsibility of Unref()ing vRef.
435 1 : vRef, err := h.findOrCreate(ctx, file.FileBacking.DiskFileNum)
436 1 : if err != nil {
437 1 : return iterSet{}, err
438 1 : }
439 :
440 1 : v := vRef.Value()
441 1 : r := v.mustSSTableReader()
442 1 : // Note: This suffers an allocation for virtual sstables.
443 1 : cr := createCommonReader(v, file)
444 1 : var iters iterSet
445 1 : if kinds.RangeKey() && file.HasRangeKeys {
446 1 : iters.rangeKey, err = newRangeKeyIter(ctx, r, file, cr, opts.SpanIterOptions(), internalOpts)
447 1 : }
448 1 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
449 1 : iters.rangeDeletion, err = newRangeDelIter(ctx, file, cr, h, internalOpts)
450 1 : }
451 1 : if kinds.Point() && err == nil {
452 1 : iters.point, err = h.newPointIter(ctx, v, file, cr, opts, internalOpts, h)
453 1 : }
454 1 : if err != nil {
455 1 : // NB: There's a subtlety here: Because the point iterator is the last
456 1 : // iterator we attempt to create, it's not possible for:
457 1 : // err != nil && iters.point != nil
458 1 : // If it were possible, we'd need to account for it to avoid double
459 1 : // unref-ing here, once during CloseAll and once during `unrefValue`.
460 1 : iters.CloseAll()
461 1 : vRef.Unref()
462 1 : return iterSet{}, err
463 1 : }
464 : // Only point iterators ever require the reader stay pinned in the cache. If
465 : // we're not returning a point iterator to the caller, we need to unref v.
466 : //
467 : // For point iterators, v.closeHook will be called which will release the ref.
468 1 : if iters.point == nil {
469 1 : vRef.Unref()
470 1 : }
471 1 : return iters, nil
472 : }
473 :
474 : // For flushable ingests, we decide whether to use the bloom filter base on
475 : // size.
476 : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
477 :
478 : // newPointIter is an internal helper that constructs a point iterator over a
479 : // sstable. This function is for internal use only, and callers should use
480 : // newIters instead.
481 : func (h *fileCacheHandle) newPointIter(
482 : ctx context.Context,
483 : v *fileCacheValue,
484 : file *manifest.TableMetadata,
485 : cr sstable.CommonReader,
486 : opts *IterOptions,
487 : internalOpts internalIterOpts,
488 : handle *fileCacheHandle,
489 1 : ) (internalIterator, error) {
490 1 : var (
491 1 : hideObsoletePoints bool = false
492 1 : pointKeyFilters []BlockPropertyFilter
493 1 : filterer *sstable.BlockPropertiesFilterer
494 1 : )
495 1 : r := v.mustSSTableReader()
496 1 : if opts != nil {
497 1 : // This code is appending (at most one filter) in-place to
498 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
499 1 : // the same iterator tree. This is acceptable since all the following
500 1 : // properties are true:
501 1 : // - The iterator tree is single threaded, so the shared backing for the
502 1 : // slice is being mutated in a single threaded manner.
503 1 : // - Each shallow copy of the slice has its own notion of length.
504 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
505 1 : // struct, which is stateless, so overwriting that struct when creating
506 1 : // one sstable iterator is harmless to other sstable iterators that are
507 1 : // relying on that struct.
508 1 : //
509 1 : // An alternative would be to have different slices for different sstable
510 1 : // iterators, but that requires more work to avoid allocations.
511 1 : //
512 1 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
513 1 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
514 1 : // apply the obsolete block property filter. We could optimize this by
515 1 : // applying the filter.
516 1 : hideObsoletePoints, pointKeyFilters =
517 1 : r.TryAddBlockPropertyFilterForHideObsoletePoints(
518 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
519 1 :
520 1 : var ok bool
521 1 : var err error
522 1 : ok, filterer, err = checkAndIntersectFilters(r, pointKeyFilters,
523 1 : internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
524 1 : if err != nil {
525 0 : return nil, err
526 1 : } else if !ok {
527 1 : // No point keys within the table match the filters.
528 1 : return nil, nil
529 1 : }
530 : }
531 :
532 1 : var iter sstable.Iterator
533 1 : filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
534 1 : if opts != nil {
535 1 : // By default, we don't use block filters for L6 and restrict the size for
536 1 : // flushable ingests, as these blocks can be very big.
537 1 : if !opts.UseL6Filters {
538 1 : if opts.layer == manifest.Level(6) {
539 1 : filterBlockSizeLimit = sstable.NeverUseFilterBlock
540 1 : } else if opts.layer.IsFlushableIngests() {
541 1 : filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
542 1 : }
543 : }
544 1 : if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
545 1 : ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
546 1 : }
547 : }
548 1 : tableFormat, err := r.TableFormat()
549 1 : if err != nil {
550 0 : return nil, err
551 0 : }
552 :
553 1 : if v.isShared && file.SyntheticSeqNum() != 0 {
554 1 : if tableFormat < sstable.TableFormatPebblev4 {
555 0 : return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
556 0 : }
557 : // The table is shared and ingested.
558 1 : hideObsoletePoints = true
559 : }
560 1 : transforms := file.IterTransforms()
561 1 : transforms.HideObsoletePoints = hideObsoletePoints
562 1 : if internalOpts.readEnv.IterStats == nil && opts != nil {
563 1 : internalOpts.readEnv.IterStats = handle.SSTStatsCollector().Accumulator(uint64(uintptr(unsafe.Pointer(r))), opts.Category)
564 1 : }
565 1 : if internalOpts.compaction {
566 1 : iter, err = cr.NewCompactionIter(transforms, internalOpts.readEnv, &v.readerProvider)
567 1 : } else {
568 1 : iter, err = cr.NewPointIter(
569 1 : ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer,
570 1 : filterBlockSizeLimit, internalOpts.readEnv, &v.readerProvider)
571 1 : }
572 1 : if err != nil {
573 1 : return nil, err
574 1 : }
575 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
576 : // care to avoid introducing an allocation here by adding a closure.
577 1 : iter.SetCloseHook(v.closeHook)
578 1 : handle.iterCount.Add(1)
579 1 : if invariants.RaceEnabled {
580 0 : stack := debug.Stack()
581 0 : h.raceMu.Lock()
582 0 : h.raceMu.iters[iter] = stack
583 0 : h.raceMu.Unlock()
584 0 : }
585 1 : return iter, nil
586 : }
587 :
588 : // newRangeDelIter is an internal helper that constructs an iterator over a
589 : // sstable's range deletions. This function is for table-cache internal use
590 : // only, and callers should use newIters instead.
591 : func newRangeDelIter(
592 : ctx context.Context,
593 : file *manifest.TableMetadata,
594 : cr sstable.CommonReader,
595 : handle *fileCacheHandle,
596 : internalOpts internalIterOpts,
597 1 : ) (keyspan.FragmentIterator, error) {
598 1 : // NB: range-del iterator does not maintain a reference to the table, nor
599 1 : // does it need to read from it after creation.
600 1 : rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms(), internalOpts.readEnv)
601 1 : if err != nil {
602 1 : return nil, err
603 1 : }
604 : // Assert expected bounds in tests.
605 1 : if invariants.Sometimes(50) && rangeDelIter != nil {
606 1 : cmp := base.DefaultComparer.Compare
607 1 : if handle.readerOpts.Comparer != nil {
608 1 : cmp = handle.readerOpts.Comparer.Compare
609 1 : }
610 1 : rangeDelIter = keyspan.AssertBounds(
611 1 : rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
612 1 : )
613 : }
614 1 : return rangeDelIter, nil
615 : }
616 :
617 : // newRangeKeyIter is an internal helper that constructs an iterator over a
618 : // sstable's range keys. This function is for table-cache internal use only, and
619 : // callers should use newIters instead.
620 : func newRangeKeyIter(
621 : ctx context.Context,
622 : r *sstable.Reader,
623 : file *tableMetadata,
624 : cr sstable.CommonReader,
625 : opts keyspan.SpanIterOptions,
626 : internalOpts internalIterOpts,
627 1 : ) (keyspan.FragmentIterator, error) {
628 1 : transforms := file.FragmentIterTransforms()
629 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
630 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
631 1 : // file's range key blocks may surface deleted range keys below. This is
632 1 : // done here, rather than deferring to the block-property collector in order
633 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
634 1 : if r.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
635 0 : ok, _, err := checkAndIntersectFilters(r, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
636 0 : if err != nil {
637 0 : return nil, err
638 0 : } else if !ok {
639 0 : return nil, nil
640 0 : }
641 : }
642 : // TODO(radu): wrap in an AssertBounds.
643 1 : return cr.NewRawRangeKeyIter(ctx, transforms, internalOpts.readEnv)
644 : }
645 :
646 : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
647 : // specific table.
648 : type tableCacheShardReaderProvider struct {
649 : c *genericcache.Cache[fileCacheKey, fileCacheValue]
650 : key fileCacheKey
651 :
652 : mu struct {
653 : sync.Mutex
654 : // r is the result of c.FindOrCreate(), only set iff refCount > 0.
655 : r genericcache.ValueRef[fileCacheKey, fileCacheValue]
656 : // refCount is the number of GetReader() calls that have not received a
657 : // corresponding Close().
658 : refCount int
659 : }
660 : }
661 :
662 : var _ valblk.ReaderProvider = &tableCacheShardReaderProvider{}
663 :
664 1 : func (rp *tableCacheShardReaderProvider) init(fc *FileCache, key fileCacheKey) {
665 1 : rp.c = &fc.c
666 1 : rp.key = key
667 1 : rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
668 1 : rp.mu.refCount = 0
669 1 : }
670 :
671 : // GetReader implements sstable.ReaderProvider. Note that it is not the
672 : // responsibility of tableCacheShardReaderProvider to ensure that the file
673 : // continues to exist. The ReaderProvider is used in iterators where the
674 : // top-level iterator is pinning the read state and preventing the files from
675 : // being deleted.
676 : //
677 : // The caller must call tableCacheShardReaderProvider.Close.
678 : //
679 : // Note that currently the Reader returned here is only used to read value
680 : // blocks. This reader shouldn't be used for other purposes like reading keys
681 : // outside of virtual sstable bounds.
682 : //
683 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
684 : // that the reader isn't used for other purposes.
685 : func (rp *tableCacheShardReaderProvider) GetReader(
686 : ctx context.Context,
687 1 : ) (valblk.ExternalBlockReader, error) {
688 1 : rp.mu.Lock()
689 1 : defer rp.mu.Unlock()
690 1 :
691 1 : if rp.mu.refCount > 0 {
692 0 : // We already have a value.
693 0 : rp.mu.refCount++
694 0 : return rp.mu.r.Value().mustSSTableReader(), nil
695 0 : }
696 :
697 : // Calling FindOrCreate gives us the responsibility of Unref()ing r, which
698 : // will happen when rp.mu.refCount reaches 0. Note that if the table is no
699 : // longer in the cache, FindOrCreate will need to do IO (through initFn in
700 : // NewFileCache) to initialize a new Reader. We hold rp.mu during this time so
701 : // that concurrent GetReader calls block until the Reader is created.
702 1 : r, err := rp.c.FindOrCreate(ctx, rp.key)
703 1 : if err != nil {
704 0 : return nil, err
705 0 : }
706 1 : rp.mu.r = r
707 1 : rp.mu.refCount = 1
708 1 : return r.Value().mustSSTableReader(), nil
709 : }
710 :
711 : // Close implements sstable.ReaderProvider.
712 1 : func (rp *tableCacheShardReaderProvider) Close() {
713 1 : rp.mu.Lock()
714 1 : defer rp.mu.Unlock()
715 1 : rp.mu.refCount--
716 1 : if rp.mu.refCount <= 0 {
717 1 : if rp.mu.refCount < 0 {
718 0 : panic("pebble: sstable.ReaderProvider misuse")
719 : }
720 1 : rp.mu.r.Unref()
721 1 : rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
722 : }
723 : }
724 :
725 : // getTableProperties returns sst table properties for target file.
726 1 : func (h *fileCacheHandle) getTableProperties(file *tableMetadata) (*sstable.Properties, error) {
727 1 : // Calling findOrCreate gives us the responsibility of decrementing v's refCount here
728 1 : v, err := h.findOrCreate(context.TODO(), file.FileBacking.DiskFileNum)
729 1 : if err != nil {
730 0 : return nil, err
731 0 : }
732 1 : defer v.Unref()
733 1 :
734 1 : r := v.Value().mustSSTableReader()
735 1 : return &r.Properties, nil
736 : }
737 :
738 : type fileCacheValue struct {
739 : closeHook func(i any)
740 : reader io.Closer // *sstable.Reader
741 : isShared bool
742 :
743 : // readerProvider is embedded here so that we only allocate it once as long as
744 : // the table stays in the cache. Its state is not always logically tied to
745 : // this specific fileCacheShard - if a table goes out of the cache and then
746 : // comes back in, the readerProvider in a now-defunct fileCacheValue can
747 : // still be used and will internally refer to the new fileCacheValue.
748 : readerProvider tableCacheShardReaderProvider
749 : }
750 :
751 : // mustSSTable retrieves the value's *sstable.Reader. It panics if the cached
752 : // file is not a sstable (i.e., it is a blob file).
753 1 : func (v *fileCacheValue) mustSSTableReader() *sstable.Reader {
754 1 : return v.reader.(*sstable.Reader)
755 1 : }
756 :
757 : // iterSet holds a set of iterators of various key kinds, all constructed over
758 : // the same data structure (eg, an sstable). A subset of the fields may be
759 : // populated depending on the `iterKinds` passed to newIters.
760 : type iterSet struct {
761 : point internalIterator
762 : rangeDeletion keyspan.FragmentIterator
763 : rangeKey keyspan.FragmentIterator
764 : }
765 :
766 : // TODO(jackson): Consider adding methods for fast paths that check whether an
767 : // iterator of a particular kind is nil, so that these call sites don't need to
768 : // reach into the struct's fields directly.
769 :
770 : // Point returns the contained point iterator. If there is no point iterator,
771 : // Point returns a non-nil empty point iterator.
772 1 : func (s *iterSet) Point() internalIterator {
773 1 : if s.point == nil {
774 1 : return emptyIter
775 1 : }
776 1 : return s.point
777 : }
778 :
779 : // RangeDeletion returns the contained range deletion iterator. If there is no
780 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
781 : // iterator.
782 1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
783 1 : if s.rangeDeletion == nil {
784 1 : return emptyKeyspanIter
785 1 : }
786 1 : return s.rangeDeletion
787 : }
788 :
789 : // RangeKey returns the contained range key iterator. If there is no range key
790 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
791 1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
792 1 : if s.rangeKey == nil {
793 0 : return emptyKeyspanIter
794 0 : }
795 1 : return s.rangeKey
796 : }
797 :
798 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
799 : // must be not be called on the constituent iterators.
800 1 : func (s *iterSet) CloseAll() error {
801 1 : var err error
802 1 : if s.point != nil {
803 1 : err = s.point.Close()
804 1 : s.point = nil
805 1 : }
806 1 : if s.rangeDeletion != nil {
807 1 : s.rangeDeletion.Close()
808 1 : s.rangeDeletion = nil
809 1 : }
810 1 : if s.rangeKey != nil {
811 1 : s.rangeKey.Close()
812 1 : s.rangeKey = nil
813 1 : }
814 1 : return err
815 : }
816 :
817 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
818 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
819 : // represent a set of desired iterator kinds.
820 : type iterKinds uint8
821 :
822 1 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
823 1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
824 1 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
825 :
826 : const (
827 : iterPointKeys iterKinds = 1 << iota
828 : iterRangeDeletions
829 : iterRangeKeys
830 : )
|