Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "sync"
14 : "sync/atomic"
15 : "unsafe"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/cache"
20 : "github.com/cockroachdb/pebble/internal/genericcache"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/sstableinternal"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
28 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
29 : "github.com/cockroachdb/pebble/sstable"
30 : "github.com/cockroachdb/pebble/sstable/blob"
31 : "github.com/cockroachdb/pebble/sstable/block"
32 : "github.com/cockroachdb/pebble/sstable/valblk"
33 : "github.com/cockroachdb/pebble/vfs"
34 : "github.com/cockroachdb/redact"
35 : )
36 :
37 : var emptyIter = &errorIter{err: nil}
38 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
39 :
40 : // tableNewIters creates new iterators (point, range deletion and/or range key)
41 : // for the given table metadata. Which of the various iterator kinds the user is
42 : // requesting is specified with the iterKinds bitmap.
43 : //
44 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
45 : // populated with iterators.
46 : //
47 : // If a point iterator is requested and the operation was successful,
48 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
49 : // finished.
50 : //
51 : // If a range deletion or range key iterator is requested, the corresponding
52 : // iterator may be nil if the table does not contain any keys of the
53 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
54 : // RangeKey() convenience methods that return non-nil empty iterators that may
55 : // be used if the caller requires a non-nil iterator.
56 : //
57 : // On error, all iterators are nil.
58 : //
59 : // The only (non-test) implementation of tableNewIters is
60 : // fileCacheHandle.newIters().
61 : type tableNewIters func(
62 : ctx context.Context,
63 : file *manifest.TableMetadata,
64 : opts *IterOptions,
65 : internalOpts internalIterOpts,
66 : kinds iterKinds,
67 : ) (iterSet, error)
68 :
69 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
70 : // for the rangedel iterator returned by tableNewIters.
71 1 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
72 1 : return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
73 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
74 1 : if err != nil {
75 0 : return nil, err
76 0 : }
77 1 : return iters.RangeDeletion(), nil
78 : }
79 : }
80 :
81 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
82 : // for the range key iterator returned by tableNewIters.
83 1 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
84 1 : return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
85 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
86 1 : if err != nil {
87 0 : return nil, err
88 0 : }
89 1 : return iters.RangeKey(), nil
90 : }
91 : }
92 :
93 : // fileCacheHandle is used to access the file cache. Each DB has its own handle.
94 : type fileCacheHandle struct {
95 : fileCache *FileCache
96 :
97 : // The handle contains fields which are unique to each DB. Note that these get
98 : // accessed from all shards, so keep read-only fields separate for read-write
99 : // fields.
100 : loggerAndTracer LoggerAndTracer
101 : blockCacheHandle *cache.Handle
102 : objProvider objstorage.Provider
103 : readerOpts sstable.ReaderOptions
104 :
105 : // iterCount keeps track of how many iterators are open. It is used to keep
106 : // track of leaked iterators on a per-db level.
107 : iterCount atomic.Int32
108 : sstStatsCollector block.CategoryStatsCollector
109 :
110 : // reportCorruptionFn is used for block.ReadEnv.ReportCorruptionFn. It expects
111 : // the first argument to be a `*TableMetadata`. It returns an error that
112 : // contains more details.
113 : reportCorruptionFn func(any, error) error
114 :
115 : // This struct is only populated in race builds.
116 : raceMu struct {
117 : sync.Mutex
118 : // nextRefID is the next ID to allocate for a new reference.
119 : nextRefID uint64
120 : // openRefs maps reference IDs to the stack trace recorded at creation
121 : // time. It's used to track which call paths leaked open references to
122 : // files.
123 : openRefs map[uint64][]byte
124 : }
125 : }
126 :
127 : // Assert that *fileCacheHandle implements blob.ReaderProvider.
128 : var _ blob.ReaderProvider = (*fileCacheHandle)(nil)
129 :
130 : // newHandle creates a handle for the FileCache which has its own options. Each
131 : // handle has its own set of files in the cache, separate from those of other
132 : // handles.
133 : func (c *FileCache) newHandle(
134 : cacheHandle *cache.Handle,
135 : objProvider objstorage.Provider,
136 : loggerAndTracer LoggerAndTracer,
137 : readerOpts sstable.ReaderOptions,
138 : reportCorruptionFn func(any, error) error,
139 1 : ) *fileCacheHandle {
140 1 : c.Ref()
141 1 :
142 1 : t := &fileCacheHandle{
143 1 : fileCache: c,
144 1 : loggerAndTracer: loggerAndTracer,
145 1 : blockCacheHandle: cacheHandle,
146 1 : objProvider: objProvider,
147 1 : }
148 1 : t.readerOpts = readerOpts
149 1 : t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
150 1 : t.reportCorruptionFn = reportCorruptionFn
151 1 : if invariants.RaceEnabled {
152 0 : t.raceMu.openRefs = make(map[uint64][]byte)
153 0 : }
154 1 : return t
155 : }
156 :
157 : // Close the handle, make sure that there will be no further need
158 : // to access any of the files associated with the store.
159 1 : func (h *fileCacheHandle) Close() error {
160 1 : // We want to do some cleanup work here. Check for leaked iterators
161 1 : // by the DB using this container. Note that we'll still perform cleanup
162 1 : // below in the case that there are leaked iterators.
163 1 : var err error
164 1 : if v := h.iterCount.Load(); v > 0 {
165 0 : if !invariants.RaceEnabled {
166 0 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
167 0 : } else {
168 0 : var buf bytes.Buffer
169 0 : for _, stack := range h.raceMu.openRefs {
170 0 : fmt.Fprintf(&buf, "%s\n", stack)
171 0 : }
172 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
173 : }
174 : }
175 :
176 : // EvictAll would panic if there are still outstanding references.
177 1 : if err == nil {
178 1 : keys := h.fileCache.c.EvictAll(func(key fileCacheKey) bool {
179 1 : return key.handle == h
180 1 : })
181 : // Evict any associated blocks in the cache.
182 1 : for i := range keys {
183 1 : h.blockCacheHandle.EvictFile(keys[i].fileNum)
184 1 : }
185 : }
186 :
187 1 : h.fileCache.Unref()
188 1 : // TODO(radu): we have to tolerate metrics() calls after close (see
189 1 : // https://github.com/cockroachdb/cockroach/issues/140454).
190 1 : // *h = fileCacheHandle{}
191 1 : return err
192 : }
193 :
194 : // openFile is called when we insert a new entry in the file cache.
195 : func (h *fileCacheHandle) openFile(
196 : ctx context.Context, fileNum base.DiskFileNum, fileType base.FileType,
197 1 : ) (io.Closer, objstorage.ObjectMetadata, error) {
198 1 : f, err := h.objProvider.OpenForReading(
199 1 : ctx, fileType, fileNum, objstorage.OpenOptions{MustExist: true},
200 1 : )
201 1 : if err != nil {
202 0 : return nil, objstorage.ObjectMetadata{}, err
203 0 : }
204 1 : objMeta, err := h.objProvider.Lookup(fileType, fileNum)
205 1 : if err != nil {
206 0 : return nil, objstorage.ObjectMetadata{}, err
207 0 : }
208 :
209 1 : o := h.readerOpts
210 1 : o.CacheOpts = sstableinternal.CacheOptions{
211 1 : CacheHandle: h.blockCacheHandle,
212 1 : FileNum: fileNum,
213 1 : }
214 1 : switch fileType {
215 1 : case base.FileTypeTable:
216 1 : r, err := sstable.NewReader(ctx, f, o)
217 1 : if err != nil {
218 0 : // If opening the sstable reader fails, we're responsible for
219 0 : // closing the objstorage.Readable.
220 0 : return nil, objMeta, errors.CombineErrors(err, f.Close())
221 0 : }
222 1 : return r, objMeta, nil
223 1 : case base.FileTypeBlob:
224 1 : r, err := blob.NewFileReader(ctx, f, blob.FileReaderOptions{
225 1 : ReaderOptions: o.ReaderOptions,
226 1 : })
227 1 : if err != nil {
228 0 : // If opening the blob file reader fails, we're responsible for
229 0 : // closing the objstorage.Readable.
230 0 : return nil, objMeta, errors.CombineErrors(err, f.Close())
231 0 : }
232 1 : return r, objMeta, nil
233 0 : default:
234 0 : panic(errors.AssertionFailedf("pebble: unexpected file cache file type: %s", fileType))
235 : }
236 : }
237 :
238 : // findOrCreateTable retrieves an existing sstable reader or creates a new one
239 : // for the backing file of the given table. If a corruption error is
240 : // encountered, reportCorruptionFn() is called.
241 : func (h *fileCacheHandle) findOrCreateTable(
242 : ctx context.Context, meta *manifest.TableMetadata,
243 1 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
244 1 : key := fileCacheKey{
245 1 : handle: h,
246 1 : fileNum: meta.TableBacking.DiskFileNum,
247 1 : fileType: base.FileTypeTable,
248 1 : }
249 1 : valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
250 1 : if err != nil && IsCorruptionError(err) {
251 0 : err = h.reportCorruptionFn(meta, err)
252 0 : }
253 1 : return valRef, err
254 : }
255 :
256 : // findOrCreateBlob retrieves an existing blob reader or creates a new one for
257 : // the given blob file. If a corruption error is encountered,
258 : // reportCorruptionFn() is called.
259 : func (h *fileCacheHandle) findOrCreateBlob(
260 : ctx context.Context, fileNum base.DiskFileNum,
261 1 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
262 1 : key := fileCacheKey{
263 1 : handle: h,
264 1 : fileNum: fileNum,
265 1 : fileType: base.FileTypeBlob,
266 1 : }
267 1 : valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
268 1 : // TODO(jackson): Propagate a blob metadata object here.
269 1 : if err != nil && IsCorruptionError(err) {
270 0 : err = h.reportCorruptionFn(nil, err)
271 0 : }
272 1 : return valRef, err
273 : }
274 :
275 : // Evict the given file from the file cache and the block cache.
276 1 : func (h *fileCacheHandle) Evict(fileNum base.DiskFileNum, fileType base.FileType) {
277 1 : defer func() {
278 1 : if p := recover(); p != nil {
279 0 : panic(fmt.Sprintf("pebble: evicting in-use file %s(%s): %v", fileNum, fileType, p))
280 : }
281 : }()
282 1 : h.fileCache.c.Evict(fileCacheKey{handle: h, fileNum: fileNum, fileType: fileType})
283 1 : h.blockCacheHandle.EvictFile(fileNum)
284 : }
285 :
286 1 : func (h *fileCacheHandle) SSTStatsCollector() *block.CategoryStatsCollector {
287 1 : return &h.sstStatsCollector
288 1 : }
289 :
290 : // Metrics returns metrics for the file cache. Note that the CacheMetrics track
291 : // the global cache which is shared between multiple handles (stores). The
292 : // FilterMetrics are per-handle.
293 1 : func (h *fileCacheHandle) Metrics() (CacheMetrics, FilterMetrics) {
294 1 : m := h.fileCache.c.Metrics()
295 1 :
296 1 : // The generic cache maintains a count of entries, but it doesn't know which
297 1 : // entries are sstables and which are blob files, which affects the memory
298 1 : // footprint of the table cache. So the FileCache maintains its own counts,
299 1 : // incremented when initializing a new value and decremented by the
300 1 : // releasing func.
301 1 : countSSTables := h.fileCache.counts.sstables.Load()
302 1 : countBlobFiles := h.fileCache.counts.blobFiles.Load()
303 1 :
304 1 : cm := CacheMetrics{
305 1 : Hits: m.Hits,
306 1 : Misses: m.Misses,
307 1 : Count: countSSTables + countBlobFiles,
308 1 : Size: m.Size + countSSTables*int64(unsafe.Sizeof(sstable.Reader{})) +
309 1 : countBlobFiles*int64(unsafe.Sizeof(blob.FileReader{})),
310 1 : }
311 1 : fm := h.readerOpts.FilterMetricsTracker.Load()
312 1 : return cm, fm
313 1 : }
314 :
315 : func (h *fileCacheHandle) estimateSize(
316 : meta *manifest.TableMetadata, lower, upper []byte,
317 1 : ) (size uint64, err error) {
318 1 : err = h.withReader(context.TODO(), block.NoReadEnv, meta, func(r *sstable.Reader, env sstable.ReadEnv) error {
319 1 : size, err = r.EstimateDiskUsage(lower, upper, env)
320 1 : return err
321 1 : })
322 1 : return size, err
323 : }
324 :
325 : func createReader(
326 : v *fileCacheValue, meta *manifest.TableMetadata,
327 1 : ) (*sstable.Reader, sstable.ReadEnv) {
328 1 : r := v.mustSSTableReader()
329 1 : env := sstable.ReadEnv{}
330 1 : if meta.Virtual {
331 1 : if invariants.Enabled {
332 1 : if meta.VirtualParams.FileNum == 0 || meta.VirtualParams.Lower.UserKey == nil || meta.VirtualParams.Upper.UserKey == nil {
333 0 : panic("virtual params not initialized")
334 : }
335 : }
336 1 : env.Virtual = meta.VirtualParams
337 1 : env.IsSharedIngested = v.isShared && meta.SyntheticSeqNum() != 0
338 : }
339 1 : return r, env
340 : }
341 :
342 : func (h *fileCacheHandle) withReader(
343 : ctx context.Context,
344 : blockEnv block.ReadEnv,
345 : meta *manifest.TableMetadata,
346 : fn func(*sstable.Reader, sstable.ReadEnv) error,
347 1 : ) error {
348 1 : ref, err := h.findOrCreateTable(ctx, meta)
349 1 : if err != nil {
350 0 : return err
351 0 : }
352 1 : defer ref.Unref()
353 1 : v := ref.Value()
354 1 : blockEnv.ReportCorruptionFn = h.reportCorruptionFn
355 1 : blockEnv.ReportCorruptionArg = meta
356 1 : env := sstable.ReadEnv{Block: blockEnv}
357 1 :
358 1 : r := v.mustSSTableReader()
359 1 : if meta.Virtual {
360 1 : if invariants.Enabled {
361 1 : if meta.VirtualParams.FileNum == 0 || meta.VirtualParams.Lower.UserKey == nil || meta.VirtualParams.Upper.UserKey == nil {
362 0 : panic("virtual params not initialized")
363 : }
364 : }
365 1 : env.Virtual = meta.VirtualParams
366 1 : env.IsSharedIngested = v.isShared && meta.SyntheticSeqNum() != 0
367 : }
368 :
369 1 : return fn(r, env)
370 :
371 : }
372 :
373 1 : func (h *fileCacheHandle) IterCount() int64 {
374 1 : return int64(h.iterCount.Load())
375 1 : }
376 :
377 : // GetValueReader returns a blob.ValueReader for blob file identified by fileNum.
378 : func (h *fileCacheHandle) GetValueReader(
379 : ctx context.Context, fileNum base.DiskFileNum,
380 1 : ) (r blob.ValueReader, closeFunc func(), err error) {
381 1 : ref, err := h.findOrCreateBlob(ctx, fileNum)
382 1 : if err != nil {
383 0 : return nil, nil, err
384 0 : }
385 1 : v := ref.Value()
386 1 : r = v.mustBlob()
387 1 : // NB: The call to findOrCreateBlob incremented the value's reference count.
388 1 : // The closeHook (v.closeHook) takes responsibility for unreferencing the
389 1 : // value. Take care to avoid introducing an allocation here by adding a
390 1 : // closure.
391 1 : closeHook := h.addReference(v)
392 1 : return r, closeHook, nil
393 : }
394 :
395 : // FileCache is a shareable cache for open files. Open files are exclusively
396 : // sstable files today.
397 : type FileCache struct {
398 : refs atomic.Int64
399 : counts struct {
400 : sstables atomic.Int64
401 : blobFiles atomic.Int64
402 : }
403 :
404 : c genericcache.Cache[fileCacheKey, fileCacheValue]
405 : }
406 :
407 : // Ref adds a reference to the file cache. Once a file cache is constructed, the
408 : // cache only remains valid if there is at least one reference to it.
409 1 : func (c *FileCache) Ref() {
410 1 : v := c.refs.Add(1)
411 1 : // We don't want the reference count to ever go from 0 -> 1,
412 1 : // cause a reference count of 0 implies that we've closed the cache.
413 1 : if v <= 1 {
414 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
415 : }
416 : }
417 :
418 : // Unref removes a reference to the file cache.
419 1 : func (c *FileCache) Unref() {
420 1 : v := c.refs.Add(-1)
421 1 : switch {
422 0 : case v < 0:
423 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
424 1 : case v == 0:
425 1 : c.c.Close()
426 1 : c.c = genericcache.Cache[fileCacheKey, fileCacheValue]{}
427 : }
428 : }
429 :
430 : // NewFileCache will create a new file cache with one outstanding reference. It
431 : // is the callers responsibility to call Unref if they will no longer hold a
432 : // reference to the file cache.
433 1 : func NewFileCache(numShards int, size int) *FileCache {
434 1 : if size == 0 {
435 0 : panic("pebble: cannot create a file cache of size 0")
436 1 : } else if numShards == 0 {
437 0 : panic("pebble: cannot create a file cache with 0 shards")
438 : }
439 :
440 1 : c := &FileCache{}
441 1 :
442 1 : // initFn is used whenever a new entry is added to the file cache.
443 1 : initFn := func(ctx context.Context, key fileCacheKey, vRef genericcache.ValueRef[fileCacheKey, fileCacheValue]) error {
444 1 : v := vRef.Value()
445 1 : handle := key.handle
446 1 : v.readerProvider.init(c, key)
447 1 : v.closeHook = func() {
448 1 : // closeHook is called when an iterator is closed; the initialization of
449 1 : // an iterator with this value will happen after a FindOrCreate() call
450 1 : // with returns the same vRef.
451 1 : vRef.Unref()
452 1 : handle.iterCount.Add(-1)
453 1 : }
454 1 : reader, objMeta, err := handle.openFile(ctx, key.fileNum, key.fileType)
455 1 : if err != nil {
456 0 : return errors.Wrapf(err, "pebble: backing file %s error", redact.Safe(key.fileNum))
457 0 : }
458 1 : v.reader = reader
459 1 : v.isShared = objMeta.IsShared()
460 1 : switch key.fileType {
461 1 : case base.FileTypeTable:
462 1 : c.counts.sstables.Add(1)
463 1 : case base.FileTypeBlob:
464 1 : c.counts.blobFiles.Add(1)
465 0 : default:
466 0 : panic("unexpected file type")
467 : }
468 1 : return nil
469 : }
470 :
471 1 : releaseFn := func(v *fileCacheValue) {
472 1 : if v.reader != nil {
473 1 : switch v.reader.(type) {
474 1 : case *sstable.Reader:
475 1 : c.counts.sstables.Add(-1)
476 1 : case *blob.FileReader:
477 1 : c.counts.blobFiles.Add(-1)
478 : }
479 1 : _ = v.reader.Close()
480 1 : v.reader = nil
481 : }
482 : }
483 :
484 1 : c.c.Init(size, numShards, initFn, releaseFn)
485 1 :
486 1 : // Hold a ref to the cache here.
487 1 : c.refs.Store(1)
488 1 :
489 1 : return c
490 : }
491 :
492 : type fileCacheKey struct {
493 : handle *fileCacheHandle
494 : fileNum base.DiskFileNum
495 : // fileType describes the type of file being cached (blob or sstable). A
496 : // file number alone uniquely identifies every file within a DB, but we need
497 : // to propagate the type so the file cache looks for the correct file in
498 : // object storage / the filesystem.
499 : fileType base.FileType
500 : }
501 :
502 : // Shard implements the genericcache.Key interface.
503 1 : func (k fileCacheKey) Shard(numShards int) int {
504 1 : // TODO(radu): maybe incorporate a handle ID.
505 1 : return int(uint64(k.fileNum) % uint64(numShards))
506 1 : }
507 :
508 : // checkAndIntersectFilters checks the specific table and block property filters
509 : // for intersection with any available table and block-level properties. Returns
510 : // true for ok if this table should be read by this iterator.
511 : func checkAndIntersectFilters(
512 : r *sstable.Reader,
513 : blockPropertyFilters []BlockPropertyFilter,
514 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
515 : syntheticSuffix sstable.SyntheticSuffix,
516 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
517 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
518 1 : filterer, err = sstable.IntersectsTable(
519 1 : blockPropertyFilters,
520 1 : boundLimitedFilter,
521 1 : r.UserProperties,
522 1 : syntheticSuffix,
523 1 : )
524 1 : // NB: IntersectsTable will return a nil filterer if the table-level
525 1 : // properties indicate there's no intersection with the provided filters.
526 1 : if filterer == nil || err != nil {
527 1 : return false, nil, err
528 1 : }
529 : }
530 1 : return true, filterer, nil
531 : }
532 :
533 : func (h *fileCacheHandle) newIters(
534 : ctx context.Context,
535 : file *manifest.TableMetadata,
536 : opts *IterOptions,
537 : internalOpts internalIterOpts,
538 : kinds iterKinds,
539 1 : ) (iterSet, error) {
540 1 : // Calling findOrCreate gives us the responsibility of Unref()ing vRef.
541 1 : vRef, err := h.findOrCreateTable(ctx, file)
542 1 : if err != nil {
543 0 : return iterSet{}, err
544 0 : }
545 :
546 1 : internalOpts.readEnv.Block.ReportCorruptionFn = h.reportCorruptionFn
547 1 : internalOpts.readEnv.Block.ReportCorruptionArg = file
548 1 :
549 1 : v := vRef.Value()
550 1 : r, env := createReader(v, file)
551 1 : internalOpts.readEnv.Virtual = env.Virtual
552 1 : internalOpts.readEnv.IsSharedIngested = env.IsSharedIngested
553 1 :
554 1 : var iters iterSet
555 1 : if kinds.RangeKey() && file.HasRangeKeys {
556 1 : iters.rangeKey, err = newRangeKeyIter(ctx, file, r, opts.SpanIterOptions(), internalOpts)
557 1 : }
558 1 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
559 1 : iters.rangeDeletion, err = newRangeDelIter(ctx, file, r, h, internalOpts)
560 1 : }
561 1 : if kinds.Point() && err == nil {
562 1 : iters.point, err = h.newPointIter(ctx, v, file, r, opts, internalOpts, h)
563 1 : }
564 1 : if err != nil {
565 0 : // NB: There's a subtlety here: Because the point iterator is the last
566 0 : // iterator we attempt to create, it's not possible for:
567 0 : // err != nil && iters.point != nil
568 0 : // If it were possible, we'd need to account for it to avoid double
569 0 : // unref-ing here, once during CloseAll and once during `unrefValue`.
570 0 : _ = iters.CloseAll()
571 0 : vRef.Unref()
572 0 : return iterSet{}, err
573 0 : }
574 : // Only point iterators ever require the reader stay pinned in the cache. If
575 : // we're not returning a point iterator to the caller, we need to unref v.
576 : //
577 : // For point iterators, v.closeHook will be called which will release the ref.
578 1 : if iters.point == nil {
579 1 : vRef.Unref()
580 1 : }
581 1 : return iters, nil
582 : }
583 :
584 : // For flushable ingests, we decide whether to use the bloom filter base on
585 : // size.
586 : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
587 :
588 : // newPointIter is an internal helper that constructs a point iterator over a
589 : // sstable. This function is for internal use only, and callers should use
590 : // newIters instead.
591 : func (h *fileCacheHandle) newPointIter(
592 : ctx context.Context,
593 : v *fileCacheValue,
594 : file *manifest.TableMetadata,
595 : reader *sstable.Reader,
596 : opts *IterOptions,
597 : internalOpts internalIterOpts,
598 : handle *fileCacheHandle,
599 1 : ) (internalIterator, error) {
600 1 : var (
601 1 : hideObsoletePoints bool = false
602 1 : pointKeyFilters []BlockPropertyFilter
603 1 : filterer *sstable.BlockPropertiesFilterer
604 1 : )
605 1 : r := v.mustSSTableReader()
606 1 : if opts != nil {
607 1 : // This code is appending (at most one filter) in-place to
608 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
609 1 : // the same iterator tree. This is acceptable since all the following
610 1 : // properties are true:
611 1 : // - The iterator tree is single threaded, so the shared backing for the
612 1 : // slice is being mutated in a single threaded manner.
613 1 : // - Each shallow copy of the slice has its own notion of length.
614 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
615 1 : // struct, which is stateless, so overwriting that struct when creating
616 1 : // one sstable iterator is harmless to other sstable iterators that are
617 1 : // relying on that struct.
618 1 : //
619 1 : // An alternative would be to have different slices for different sstable
620 1 : // iterators, but that requires more work to avoid allocations.
621 1 : //
622 1 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
623 1 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
624 1 : // apply the obsolete block property filter. We could optimize this by
625 1 : // applying the filter.
626 1 : hideObsoletePoints, pointKeyFilters =
627 1 : r.TryAddBlockPropertyFilterForHideObsoletePoints(
628 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
629 1 :
630 1 : var ok bool
631 1 : var err error
632 1 : ok, filterer, err = checkAndIntersectFilters(r, pointKeyFilters,
633 1 : internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
634 1 : if err != nil {
635 0 : return nil, err
636 1 : } else if !ok {
637 1 : // No point keys within the table match the filters.
638 1 : return nil, nil
639 1 : }
640 : }
641 :
642 1 : var iter sstable.Iterator
643 1 : filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
644 1 : if opts != nil {
645 1 : // By default, we don't use block filters for L6 and restrict the size for
646 1 : // flushable ingests, as these blocks can be very big.
647 1 : if !opts.UseL6Filters {
648 1 : if opts.layer == manifest.Level(6) {
649 1 : filterBlockSizeLimit = sstable.NeverUseFilterBlock
650 1 : } else if opts.layer.IsFlushableIngests() {
651 1 : filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
652 1 : }
653 : }
654 1 : if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
655 1 : ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
656 1 : }
657 : }
658 :
659 1 : if v.isShared && file.SyntheticSeqNum() != 0 {
660 1 : // The table is shared and ingested.
661 1 : hideObsoletePoints = true
662 1 : }
663 1 : transforms := file.IterTransforms()
664 1 : transforms.HideObsoletePoints = hideObsoletePoints
665 1 : if internalOpts.readEnv.Block.IterStats == nil && opts != nil {
666 1 : internalOpts.readEnv.Block.IterStats = handle.SSTStatsCollector().Accumulator(uint64(uintptr(unsafe.Pointer(r))), opts.Category)
667 1 : }
668 1 : var blobReferences sstable.BlobReferences
669 1 : if r.Attributes.Has(sstable.AttributeBlobValues) {
670 1 : if len(file.BlobReferences) == 0 {
671 0 : return nil, errors.AssertionFailedf("pebble: sstable %s has blob values but no blob references", file.TableNum)
672 0 : }
673 1 : blobReferences = &file.BlobReferences
674 : }
675 1 : var err error
676 1 : if internalOpts.compaction {
677 1 : iter, err = reader.NewCompactionIter(transforms, internalOpts.readEnv,
678 1 : &v.readerProvider, sstable.TableBlobContext{
679 1 : ValueFetcher: internalOpts.blobValueFetcher,
680 1 : References: blobReferences,
681 1 : })
682 1 : } else {
683 1 : iter, err = reader.NewPointIter(ctx, sstable.IterOptions{
684 1 : Lower: opts.GetLowerBound(),
685 1 : Upper: opts.GetUpperBound(),
686 1 : Transforms: transforms,
687 1 : FilterBlockSizeLimit: filterBlockSizeLimit,
688 1 : Filterer: filterer,
689 1 : Env: internalOpts.readEnv,
690 1 : ReaderProvider: &v.readerProvider,
691 1 : BlobContext: sstable.TableBlobContext{
692 1 : ValueFetcher: internalOpts.blobValueFetcher,
693 1 : References: blobReferences,
694 1 : },
695 1 : })
696 1 : }
697 1 : if err != nil {
698 0 : return nil, err
699 0 : }
700 : // NB: closeHook (v.closeHook) takes responsibility for calling
701 : // unrefValue(v) here. Take care to avoid introducing an allocation here by
702 : // adding a closure.
703 1 : closeHook := h.addReference(v)
704 1 : iter.SetCloseHook(closeHook)
705 1 : return iter, nil
706 : }
707 :
708 1 : func (h *fileCacheHandle) addReference(v *fileCacheValue) (closeHook func()) {
709 1 : h.iterCount.Add(1)
710 1 : closeHook = v.closeHook
711 1 : if invariants.RaceEnabled {
712 0 : stack := debug.Stack()
713 0 : h.raceMu.Lock()
714 0 : refID := h.raceMu.nextRefID
715 0 : h.raceMu.openRefs[refID] = stack
716 0 : h.raceMu.nextRefID++
717 0 : h.raceMu.Unlock()
718 0 : // In race builds, this closeHook closure will force an allocation.
719 0 : // Race builds are already unperformant (and allocate a stack trace), so
720 0 : // we don't care.
721 0 : closeHook = func() {
722 0 : v.closeHook()
723 0 : h.raceMu.Lock()
724 0 : defer h.raceMu.Unlock()
725 0 : delete(h.raceMu.openRefs, refID)
726 0 : }
727 : }
728 1 : return closeHook
729 : }
730 :
731 : // SetupBlobReaderProvider creates a fileCachHandle blob.ReaderProvider for
732 : // reading blob files. The caller is responsible for calling the returned cleanup
733 : // function.
734 : //
735 : // NB: This function is intended for testing and tooling purposes only. It
736 : // provides blob file access outside of normal database operations and is not
737 : // used by databases opened through Open().
738 : func SetupBlobReaderProvider(
739 : fs vfs.FS, path string, opts *Options, readOpts sstable.ReaderOptions,
740 0 : ) (blob.ReaderProvider, func(), error) {
741 0 : var fc *FileCache
742 0 : var c *cache.Cache
743 0 : var ch *cache.Handle
744 0 : var objProvider objstorage.Provider
745 0 : var provider *fileCacheHandle
746 0 :
747 0 : // Helper to clean up resources in case of error.
748 0 : cleanup := func() {
749 0 : if provider != nil {
750 0 : _ = provider.Close()
751 0 : }
752 0 : if objProvider != nil {
753 0 : _ = objProvider.Close()
754 0 : }
755 0 : if ch != nil {
756 0 : ch.Close()
757 0 : }
758 0 : if c != nil {
759 0 : c.Unref()
760 0 : }
761 0 : if fc != nil {
762 0 : fc.Unref()
763 0 : }
764 : }
765 :
766 0 : fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
767 0 : if opts.FileCache == nil {
768 0 : fc = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
769 0 : } else {
770 0 : fc = opts.FileCache
771 0 : fc.Ref()
772 0 : }
773 :
774 0 : if opts.Cache == nil {
775 0 : c = cache.New(opts.CacheSize)
776 0 : } else {
777 0 : c = opts.Cache
778 0 : c.Ref()
779 0 : }
780 0 : ch = c.NewHandle()
781 0 :
782 0 : var err error
783 0 : objProvider, err = objstorageprovider.Open(objstorageprovider.DefaultSettings(fs, path))
784 0 : if err != nil {
785 0 : cleanup()
786 0 : return nil, nil, err
787 0 : }
788 :
789 0 : provider = fc.newHandle(
790 0 : ch,
791 0 : objProvider,
792 0 : opts.LoggerAndTracer,
793 0 : readOpts,
794 0 : func(any, error) error { return nil },
795 : )
796 :
797 0 : return provider, cleanup, nil
798 : }
799 :
800 : // newRangeDelIter is an internal helper that constructs an iterator over a
801 : // sstable's range deletions. This function is for table-cache internal use
802 : // only, and callers should use newIters instead.
803 : func newRangeDelIter(
804 : ctx context.Context,
805 : file *manifest.TableMetadata,
806 : r *sstable.Reader,
807 : handle *fileCacheHandle,
808 : internalOpts internalIterOpts,
809 1 : ) (keyspan.FragmentIterator, error) {
810 1 : // NB: range-del iterator does not maintain a reference to the table, nor
811 1 : // does it need to read from it after creation.
812 1 : rangeDelIter, err := r.NewRawRangeDelIter(ctx, file.FragmentIterTransforms(), internalOpts.readEnv)
813 1 : if err != nil {
814 0 : return nil, err
815 0 : }
816 : // Assert expected bounds in tests.
817 1 : if invariants.Sometimes(50) && rangeDelIter != nil {
818 1 : cmp := base.DefaultComparer.Compare
819 1 : if handle.readerOpts.Comparer != nil {
820 1 : cmp = handle.readerOpts.Comparer.Compare
821 1 : }
822 1 : rangeDelIter = keyspan.AssertBounds(
823 1 : rangeDelIter, file.PointKeyBounds.Smallest(), file.PointKeyBounds.LargestUserKey(), cmp,
824 1 : )
825 : }
826 1 : return rangeDelIter, nil
827 : }
828 :
829 : // newRangeKeyIter is an internal helper that constructs an iterator over a
830 : // sstable's range keys. This function is for table-cache internal use only, and
831 : // callers should use newIters instead.
832 : func newRangeKeyIter(
833 : ctx context.Context,
834 : file *manifest.TableMetadata,
835 : r *sstable.Reader,
836 : opts keyspan.SpanIterOptions,
837 : internalOpts internalIterOpts,
838 1 : ) (keyspan.FragmentIterator, error) {
839 1 : transforms := file.FragmentIterTransforms()
840 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
841 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
842 1 : // file's range key blocks may surface deleted range keys below. This is
843 1 : // done here, rather than deferring to the block-property collector in order
844 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
845 1 : if !r.Attributes.Has(sstable.AttributeRangeKeyDels) && len(opts.RangeKeyFilters) > 0 {
846 0 : ok, _, err := checkAndIntersectFilters(r, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
847 0 : if err != nil {
848 0 : return nil, err
849 0 : } else if !ok {
850 0 : return nil, nil
851 0 : }
852 : }
853 : // TODO(radu): wrap in an AssertBounds.
854 1 : return r.NewRawRangeKeyIter(ctx, transforms, internalOpts.readEnv)
855 : }
856 :
857 : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
858 : // specific table.
859 : type tableCacheShardReaderProvider struct {
860 : c *genericcache.Cache[fileCacheKey, fileCacheValue]
861 : key fileCacheKey
862 :
863 : mu struct {
864 : sync.Mutex
865 : // r is the result of c.FindOrCreate(), only set iff refCount > 0.
866 : r genericcache.ValueRef[fileCacheKey, fileCacheValue]
867 : // refCount is the number of GetReader() calls that have not received a
868 : // corresponding Close().
869 : refCount int
870 : }
871 : }
872 :
873 : var _ valblk.ReaderProvider = &tableCacheShardReaderProvider{}
874 :
875 1 : func (rp *tableCacheShardReaderProvider) init(fc *FileCache, key fileCacheKey) {
876 1 : rp.c = &fc.c
877 1 : rp.key = key
878 1 : rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
879 1 : rp.mu.refCount = 0
880 1 : }
881 :
882 : // GetReader implements sstable.ReaderProvider. Note that it is not the
883 : // responsibility of tableCacheShardReaderProvider to ensure that the file
884 : // continues to exist. The ReaderProvider is used in iterators where the
885 : // top-level iterator is pinning the read state and preventing the files from
886 : // being deleted.
887 : //
888 : // The caller must call tableCacheShardReaderProvider.Close.
889 : //
890 : // Note that currently the Reader returned here is only used to read value
891 : // blocks. This reader shouldn't be used for other purposes like reading keys
892 : // outside of virtual sstable bounds.
893 : //
894 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
895 : // that the reader isn't used for other purposes.
896 : func (rp *tableCacheShardReaderProvider) GetReader(
897 : ctx context.Context,
898 1 : ) (valblk.ExternalBlockReader, error) {
899 1 : rp.mu.Lock()
900 1 : defer rp.mu.Unlock()
901 1 :
902 1 : if rp.mu.refCount > 0 {
903 0 : // We already have a value.
904 0 : rp.mu.refCount++
905 0 : return rp.mu.r.Value().mustSSTableReader(), nil
906 0 : }
907 :
908 : // Calling FindOrCreate gives us the responsibility of Unref()ing r, which
909 : // will happen when rp.mu.refCount reaches 0. Note that if the table is no
910 : // longer in the cache, FindOrCreate will need to do IO (through initFn in
911 : // NewFileCache) to initialize a new Reader. We hold rp.mu during this time so
912 : // that concurrent GetReader calls block until the Reader is created.
913 1 : r, err := rp.c.FindOrCreate(ctx, rp.key)
914 1 : if err != nil {
915 0 : return nil, err
916 0 : }
917 1 : rp.mu.r = r
918 1 : rp.mu.refCount = 1
919 1 : return r.Value().mustSSTableReader(), nil
920 : }
921 :
922 : // Close implements sstable.ReaderProvider.
923 1 : func (rp *tableCacheShardReaderProvider) Close() {
924 1 : rp.mu.Lock()
925 1 : defer rp.mu.Unlock()
926 1 : rp.mu.refCount--
927 1 : if rp.mu.refCount <= 0 {
928 1 : if rp.mu.refCount < 0 {
929 0 : panic("pebble: sstable.ReaderProvider misuse")
930 : }
931 1 : rp.mu.r.Unref()
932 1 : rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
933 : }
934 : }
935 :
936 : // getTableProperties returns sst table properties for the backing file.
937 : //
938 : // WARNING! If file is a virtual table, we return the properties of the physical
939 : // table.
940 : func (h *fileCacheHandle) getTableProperties(
941 : file *manifest.TableMetadata,
942 0 : ) (*sstable.Properties, error) {
943 0 : // Calling findOrCreateTable gives us the responsibility of decrementing v's
944 0 : // refCount here
945 0 : v, err := h.findOrCreateTable(context.TODO(), file)
946 0 : if err != nil {
947 0 : return nil, err
948 0 : }
949 0 : defer v.Unref()
950 0 :
951 0 : r := v.Value().mustSSTableReader()
952 0 : props, err := r.ReadPropertiesBlock(context.TODO(), nil /* buffer pool */)
953 0 : if err != nil {
954 0 : return nil, err
955 0 : }
956 0 : return &props, nil
957 : }
958 :
959 : type fileCacheValue struct {
960 : closeHook func()
961 : reader io.Closer // *sstable.Reader or *blob.FileReader
962 : isShared bool
963 :
964 : // readerProvider is embedded here so that we only allocate it once as long as
965 : // the table stays in the cache. Its state is not always logically tied to
966 : // this specific fileCacheShard - if a table goes out of the cache and then
967 : // comes back in, the readerProvider in a now-defunct fileCacheValue can
968 : // still be used and will internally refer to the new fileCacheValue.
969 : readerProvider tableCacheShardReaderProvider
970 : }
971 :
972 : // mustSSTable retrieves the value's *sstable.Reader. It panics if the cached
973 : // file is not a sstable (i.e., it is a blob file).
974 1 : func (v *fileCacheValue) mustSSTableReader() *sstable.Reader {
975 1 : return v.reader.(*sstable.Reader)
976 1 : }
977 :
978 : // mustBlob retrieves the value's *blob.FileReader. It panics if the cached file
979 : // is not a blob file.
980 1 : func (v *fileCacheValue) mustBlob() *blob.FileReader {
981 1 : return v.reader.(*blob.FileReader)
982 1 : }
983 :
984 : // iterSet holds a set of iterators of various key kinds, all constructed over
985 : // the same data structure (eg, an sstable). A subset of the fields may be
986 : // populated depending on the `iterKinds` passed to newIters.
987 : type iterSet struct {
988 : point internalIterator
989 : rangeDeletion keyspan.FragmentIterator
990 : rangeKey keyspan.FragmentIterator
991 : }
992 :
993 : // TODO(jackson): Consider adding methods for fast paths that check whether an
994 : // iterator of a particular kind is nil, so that these call sites don't need to
995 : // reach into the struct's fields directly.
996 :
997 : // Point returns the contained point iterator. If there is no point iterator,
998 : // Point returns a non-nil empty point iterator.
999 1 : func (s *iterSet) Point() internalIterator {
1000 1 : if s.point == nil {
1001 1 : return emptyIter
1002 1 : }
1003 1 : return s.point
1004 : }
1005 :
1006 : // RangeDeletion returns the contained range deletion iterator. If there is no
1007 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
1008 : // iterator.
1009 1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
1010 1 : if s.rangeDeletion == nil {
1011 1 : return emptyKeyspanIter
1012 1 : }
1013 1 : return s.rangeDeletion
1014 : }
1015 :
1016 : // RangeKey returns the contained range key iterator. If there is no range key
1017 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
1018 1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
1019 1 : if s.rangeKey == nil {
1020 0 : return emptyKeyspanIter
1021 0 : }
1022 1 : return s.rangeKey
1023 : }
1024 :
1025 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
1026 : // must be not be called on the constituent iterators.
1027 1 : func (s *iterSet) CloseAll() error {
1028 1 : var err error
1029 1 : if s.point != nil {
1030 1 : err = s.point.Close()
1031 1 : s.point = nil
1032 1 : }
1033 1 : if s.rangeDeletion != nil {
1034 1 : s.rangeDeletion.Close()
1035 1 : s.rangeDeletion = nil
1036 1 : }
1037 1 : if s.rangeKey != nil {
1038 1 : s.rangeKey.Close()
1039 1 : s.rangeKey = nil
1040 1 : }
1041 1 : return err
1042 : }
1043 :
1044 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
1045 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
1046 : // represent a set of desired iterator kinds.
1047 : type iterKinds uint8
1048 :
1049 1 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
1050 1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
1051 1 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
1052 :
1053 : const (
1054 : iterPointKeys iterKinds = 1 << iota
1055 : iterRangeDeletions
1056 : iterRangeKeys
1057 : )
|