Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "sync"
14 : "sync/atomic"
15 : "unsafe"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/cache"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
23 : "github.com/cockroachdb/pebble/internal/manifest"
24 : "github.com/cockroachdb/pebble/internal/sstableinternal"
25 : "github.com/cockroachdb/pebble/objstorage"
26 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
27 : "github.com/cockroachdb/pebble/sstable"
28 : "github.com/cockroachdb/pebble/sstable/valblk"
29 : )
30 :
31 : var emptyIter = &errorIter{err: nil}
32 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
33 :
34 : // tableNewIters creates new iterators (point, range deletion and/or range key)
35 : // for the given file metadata. Which of the various iterator kinds the user is
36 : // requesting is specified with the iterKinds bitmap.
37 : //
38 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
39 : // populated with iterators.
40 : //
41 : // If a point iterator is requested and the operation was successful,
42 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
43 : // finished.
44 : //
45 : // If a range deletion or range key iterator is requested, the corresponding
46 : // iterator may be nil if the table does not contain any keys of the
47 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
48 : // RangeKey() convenience methods that return non-nil empty iterators that may
49 : // be used if the caller requires a non-nil iterator.
50 : //
51 : // On error, all iterators are nil.
52 : //
53 : // The only (non-test) implementation of tableNewIters is
54 : // fileCacheContainer.newIters().
55 : type tableNewIters func(
56 : ctx context.Context,
57 : file *manifest.FileMetadata,
58 : opts *IterOptions,
59 : internalOpts internalIterOpts,
60 : kinds iterKinds,
61 : ) (iterSet, error)
62 :
63 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
64 : // for the rangedel iterator returned by tableNewIters.
65 2 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
66 2 : return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
67 2 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
68 2 : if err != nil {
69 0 : return nil, err
70 0 : }
71 2 : return iters.RangeDeletion(), nil
72 : }
73 : }
74 :
75 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
76 : // for the range key iterator returned by tableNewIters.
77 2 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
78 2 : return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
79 2 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
80 2 : if err != nil {
81 1 : return nil, err
82 1 : }
83 2 : return iters.RangeKey(), nil
84 : }
85 : }
86 :
87 : // fileCacheOpts contains the db specific fields of a file cache. This is stored
88 : // in the fileCacheContainer along with the file cache.
89 : //
90 : // NB: It is important to make sure that the fields in this struct are
91 : // read-only. Since the fields here are shared by every single fileCacheShard,
92 : // if non read-only fields are updated, we could have unnecessary evictions of
93 : // those fields, and the surrounding fields from the CPU caches.
94 : type fileCacheOpts struct {
95 : // iterCount keeps track of how many iterators are open. It is used to keep
96 : // track of leaked iterators on a per-db level.
97 : iterCount *atomic.Int32
98 :
99 : loggerAndTracer LoggerAndTracer
100 : cache *cache.Cache
101 : cacheID cache.ID
102 : objProvider objstorage.Provider
103 : readerOpts sstable.ReaderOptions
104 : sstStatsCollector *sstable.CategoryStatsCollector
105 : }
106 :
107 : // fileCacheContainer contains the file cache and fields which are unique to the
108 : // DB.
109 : type fileCacheContainer struct {
110 : fileCache *FileCache
111 :
112 : // dbOpts contains fields relevant to the file cache which are unique to
113 : // each DB.
114 : dbOpts fileCacheOpts
115 : }
116 :
117 : // newFileCacheContainer will panic if the underlying block cache in the file
118 : // cache doesn't match Options.Cache.
119 : func newFileCacheContainer(
120 : fc *FileCache,
121 : cacheID cache.ID,
122 : objProvider objstorage.Provider,
123 : opts *Options,
124 : size int,
125 : sstStatsCollector *sstable.CategoryStatsCollector,
126 2 : ) *fileCacheContainer {
127 2 : // We will release a ref to the file cache acquired here when
128 2 : // fileCacheContainer.close is called.
129 2 : if fc != nil {
130 1 : if fc.cache != opts.Cache {
131 0 : panic("pebble: underlying cache for the file cache and db are different")
132 : }
133 1 : fc.Ref()
134 2 : } else {
135 2 : // NewFileCache should create a ref to fc which the container should
136 2 : // drop whenever it is closed.
137 2 : fc = NewFileCache(opts.Cache, opts.Experimental.FileCacheShards, size)
138 2 : }
139 :
140 2 : t := &fileCacheContainer{}
141 2 : t.fileCache = fc
142 2 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
143 2 : t.dbOpts.cache = opts.Cache
144 2 : t.dbOpts.cacheID = cacheID
145 2 : t.dbOpts.objProvider = objProvider
146 2 : t.dbOpts.readerOpts = opts.MakeReaderOptions()
147 2 : t.dbOpts.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
148 2 : t.dbOpts.iterCount = new(atomic.Int32)
149 2 : t.dbOpts.sstStatsCollector = sstStatsCollector
150 2 : return t
151 : }
152 :
153 : // Before calling close, make sure that there will be no further need
154 : // to access any of the files associated with the store.
155 2 : func (c *fileCacheContainer) close() error {
156 2 : // We want to do some cleanup work here. Check for leaked iterators
157 2 : // by the DB using this container. Note that we'll still perform cleanup
158 2 : // below in the case that there are leaked iterators.
159 2 : var err error
160 2 : if v := c.dbOpts.iterCount.Load(); v > 0 {
161 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
162 1 : }
163 :
164 : // Release nodes here.
165 2 : for _, shard := range c.fileCache.shards {
166 2 : if shard != nil {
167 2 : shard.removeDB(&c.dbOpts)
168 2 : }
169 : }
170 2 : return firstError(err, c.fileCache.Unref())
171 : }
172 :
173 : func (c *fileCacheContainer) newIters(
174 : ctx context.Context,
175 : file *manifest.FileMetadata,
176 : opts *IterOptions,
177 : internalOpts internalIterOpts,
178 : kinds iterKinds,
179 2 : ) (iterSet, error) {
180 2 : return c.fileCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
181 2 : }
182 :
183 : // getTableProperties returns the properties associated with the backing physical
184 : // table if the input metadata belongs to a virtual sstable.
185 1 : func (c *fileCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
186 1 : return c.fileCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
187 1 : }
188 :
189 2 : func (c *fileCacheContainer) evict(fileNum base.DiskFileNum) {
190 2 : c.fileCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
191 2 : }
192 :
193 2 : func (c *fileCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
194 2 : var m CacheMetrics
195 2 : for i := range c.fileCache.shards {
196 2 : s := c.fileCache.shards[i]
197 2 : s.mu.RLock()
198 2 : m.Count += int64(len(s.mu.nodes))
199 2 : s.mu.RUnlock()
200 2 : m.Hits += s.hits.Load()
201 2 : m.Misses += s.misses.Load()
202 2 : }
203 2 : m.Size = m.Count * int64(unsafe.Sizeof(fileCacheNode{})+unsafe.Sizeof(fileCacheValue{})+unsafe.Sizeof(sstable.Reader{}))
204 2 : f := c.dbOpts.readerOpts.FilterMetricsTracker.Load()
205 2 : return m, f
206 : }
207 :
208 : func (c *fileCacheContainer) estimateSize(
209 : meta *fileMetadata, lower, upper []byte,
210 2 : ) (size uint64, err error) {
211 2 : c.withCommonReader(meta, func(cr sstable.CommonReader) error {
212 2 : size, err = cr.EstimateDiskUsage(lower, upper)
213 2 : return err
214 2 : })
215 2 : return size, err
216 : }
217 :
218 : // createCommonReader creates a Reader for this file.
219 2 : func createCommonReader(v *fileCacheValue, file *fileMetadata) sstable.CommonReader {
220 2 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
221 2 : var cr sstable.CommonReader = v.reader
222 2 : if file.Virtual {
223 2 : virtualReader := sstable.MakeVirtualReader(
224 2 : v.reader, file.VirtualMeta().VirtualReaderParams(v.isShared),
225 2 : )
226 2 : cr = &virtualReader
227 2 : }
228 2 : return cr
229 : }
230 :
231 : func (c *fileCacheContainer) withCommonReader(
232 : meta *fileMetadata, fn func(sstable.CommonReader) error,
233 2 : ) error {
234 2 : s := c.fileCache.getShard(meta.FileBacking.DiskFileNum)
235 2 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
236 2 : defer s.unrefValue(v)
237 2 : if v.err != nil {
238 0 : return v.err
239 0 : }
240 2 : return fn(createCommonReader(v, meta))
241 : }
242 :
243 2 : func (c *fileCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
244 2 : s := c.fileCache.getShard(meta.FileBacking.DiskFileNum)
245 2 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
246 2 : defer s.unrefValue(v)
247 2 : if v.err != nil {
248 1 : return v.err
249 1 : }
250 2 : return fn(v.reader)
251 : }
252 :
253 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
254 : func (c *fileCacheContainer) withVirtualReader(
255 : meta virtualMeta, fn func(sstable.VirtualReader) error,
256 2 : ) error {
257 2 : s := c.fileCache.getShard(meta.FileBacking.DiskFileNum)
258 2 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
259 2 : defer s.unrefValue(v)
260 2 : if v.err != nil {
261 0 : return v.err
262 0 : }
263 2 : provider := c.dbOpts.objProvider
264 2 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
265 2 : if err != nil {
266 0 : return err
267 0 : }
268 2 : return fn(sstable.MakeVirtualReader(v.reader, meta.VirtualReaderParams(objMeta.IsShared())))
269 : }
270 :
271 2 : func (c *fileCacheContainer) iterCount() int64 {
272 2 : return int64(c.dbOpts.iterCount.Load())
273 2 : }
274 :
275 : // FileCache is a shareable cache for open files. Open files are exclusively
276 : // sstable files today.
277 : type FileCache struct {
278 : refs atomic.Int64
279 :
280 : cache *Cache
281 : shards []*fileCacheShard
282 : }
283 :
284 : // Ref adds a reference to the file cache. Once a file cache is constructed, the
285 : // cache only remains valid if there is at least one reference to it.
286 1 : func (c *FileCache) Ref() {
287 1 : v := c.refs.Add(1)
288 1 : // We don't want the reference count to ever go from 0 -> 1,
289 1 : // cause a reference count of 0 implies that we've closed the cache.
290 1 : if v <= 1 {
291 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
292 : }
293 : }
294 :
295 : // Unref removes a reference to the file cache.
296 2 : func (c *FileCache) Unref() error {
297 2 : v := c.refs.Add(-1)
298 2 : switch {
299 1 : case v < 0:
300 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
301 2 : case v == 0:
302 2 : var err error
303 2 : for i := range c.shards {
304 2 : // The cache shard is not allocated yet, nothing to close.
305 2 : if c.shards[i] == nil {
306 0 : continue
307 : }
308 2 : err = firstError(err, c.shards[i].Close())
309 : }
310 :
311 : // Unref the cache which we create a reference to when the file cache is
312 : // first instantiated.
313 2 : c.cache.Unref()
314 2 : return err
315 : }
316 1 : return nil
317 : }
318 :
319 : // NewFileCache will create a new file cache with one outstanding reference. It
320 : // is the callers responsibility to call Unref if they will no longer hold a
321 : // reference to the file cache.
322 2 : func NewFileCache(cache *Cache, numShards int, size int) *FileCache {
323 2 : if size == 0 {
324 0 : panic("pebble: cannot create a file cache of size 0")
325 2 : } else if numShards == 0 {
326 0 : panic("pebble: cannot create a file cache with 0 shards")
327 : }
328 :
329 2 : c := &FileCache{}
330 2 : c.cache = cache
331 2 : c.cache.Ref()
332 2 :
333 2 : c.shards = make([]*fileCacheShard, numShards)
334 2 : for i := range c.shards {
335 2 : c.shards[i] = &fileCacheShard{}
336 2 : c.shards[i].init(size / len(c.shards))
337 2 : }
338 :
339 : // Hold a ref to the cache here.
340 2 : c.refs.Store(1)
341 2 :
342 2 : return c
343 : }
344 :
345 2 : func (c *FileCache) getShard(fileNum base.DiskFileNum) *fileCacheShard {
346 2 : return c.shards[uint64(fileNum)%uint64(len(c.shards))]
347 2 : }
348 :
349 : type fileCacheKey struct {
350 : cacheID cache.ID
351 : fileNum base.DiskFileNum
352 : }
353 :
354 : type fileCacheShard struct {
355 : hits atomic.Int64
356 : misses atomic.Int64
357 : iterCount atomic.Int32
358 :
359 : size int
360 :
361 : mu struct {
362 : sync.RWMutex
363 : nodes map[fileCacheKey]*fileCacheNode
364 : // The iters map is only created and populated in race builds.
365 : iters map[io.Closer][]byte
366 :
367 : handHot *fileCacheNode
368 : handCold *fileCacheNode
369 : handTest *fileCacheNode
370 :
371 : coldTarget int
372 : sizeHot int
373 : sizeCold int
374 : sizeTest int
375 : }
376 : releasing sync.WaitGroup
377 : releasingCh chan *fileCacheValue
378 : releaseLoopExit sync.WaitGroup
379 : }
380 :
381 2 : func (c *fileCacheShard) init(size int) {
382 2 : c.size = size
383 2 :
384 2 : c.mu.nodes = make(map[fileCacheKey]*fileCacheNode)
385 2 : c.mu.coldTarget = size
386 2 : c.releasingCh = make(chan *fileCacheValue, 100)
387 2 : c.releaseLoopExit.Add(1)
388 2 : go c.releaseLoop()
389 2 :
390 2 : if invariants.RaceEnabled {
391 0 : c.mu.iters = make(map[io.Closer][]byte)
392 0 : }
393 : }
394 :
395 2 : func (c *fileCacheShard) releaseLoop() {
396 2 : defer c.releaseLoopExit.Done()
397 2 : for v := range c.releasingCh {
398 2 : v.release(c)
399 2 : }
400 : }
401 :
402 : // checkAndIntersectFilters checks the specific table and block property filters
403 : // for intersection with any available table and block-level properties. Returns
404 : // true for ok if this table should be read by this iterator.
405 : func (c *fileCacheShard) checkAndIntersectFilters(
406 : v *fileCacheValue,
407 : blockPropertyFilters []BlockPropertyFilter,
408 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
409 : syntheticSuffix sstable.SyntheticSuffix,
410 2 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
411 2 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
412 2 : filterer, err = sstable.IntersectsTable(
413 2 : blockPropertyFilters,
414 2 : boundLimitedFilter,
415 2 : v.reader.Properties.UserProperties,
416 2 : syntheticSuffix,
417 2 : )
418 2 : // NB: IntersectsTable will return a nil filterer if the table-level
419 2 : // properties indicate there's no intersection with the provided filters.
420 2 : if filterer == nil || err != nil {
421 2 : return false, nil, err
422 2 : }
423 : }
424 2 : return true, filterer, nil
425 : }
426 :
427 : func (c *fileCacheShard) newIters(
428 : ctx context.Context,
429 : file *manifest.FileMetadata,
430 : opts *IterOptions,
431 : internalOpts internalIterOpts,
432 : dbOpts *fileCacheOpts,
433 : kinds iterKinds,
434 2 : ) (iterSet, error) {
435 2 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
436 2 : // since parts of the sstable are read during the construction. The Reader
437 2 : // should not remember that context since the Reader can be long-lived.
438 2 :
439 2 : // Calling findNode gives us the responsibility of decrementing v's
440 2 : // refCount. If opening the underlying table resulted in error, then we
441 2 : // decrement this straight away. Otherwise, we pass that responsibility to
442 2 : // the sstable iterator, which decrements when it is closed.
443 2 : v := c.findNode(ctx, file.FileBacking, dbOpts)
444 2 : if v.err != nil {
445 1 : defer c.unrefValue(v)
446 1 : return iterSet{}, v.err
447 1 : }
448 :
449 : // Note: This suffers an allocation for virtual sstables.
450 2 : cr := createCommonReader(v, file)
451 2 : var iters iterSet
452 2 : var err error
453 2 : if kinds.RangeKey() && file.HasRangeKeys {
454 2 : iters.rangeKey, err = c.newRangeKeyIter(ctx, v, file, cr, opts.SpanIterOptions())
455 2 : }
456 2 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
457 2 : iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
458 2 : }
459 2 : if kinds.Point() && err == nil {
460 2 : iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
461 2 : }
462 2 : if err != nil {
463 1 : // NB: There's a subtlety here: Because the point iterator is the last
464 1 : // iterator we attempt to create, it's not possible for:
465 1 : // err != nil && iters.point != nil
466 1 : // If it were possible, we'd need to account for it to avoid double
467 1 : // unref-ing here, once during CloseAll and once during `unrefValue`.
468 1 : iters.CloseAll()
469 1 : c.unrefValue(v)
470 1 : return iterSet{}, err
471 1 : }
472 : // Only point iterators ever require the reader stay pinned in the cache. If
473 : // we're not returning a point iterator to the caller, we need to unref v.
474 2 : if iters.point == nil {
475 2 : c.unrefValue(v)
476 2 : }
477 2 : return iters, nil
478 : }
479 :
480 : // For flushable ingests, we decide whether to use the bloom filter base on
481 : // size.
482 : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
483 :
484 : // newPointIter is an internal helper that constructs a point iterator over a
485 : // sstable. This function is for internal use only, and callers should use
486 : // newIters instead.
487 : func (c *fileCacheShard) newPointIter(
488 : ctx context.Context,
489 : v *fileCacheValue,
490 : file *manifest.FileMetadata,
491 : cr sstable.CommonReader,
492 : opts *IterOptions,
493 : internalOpts internalIterOpts,
494 : dbOpts *fileCacheOpts,
495 2 : ) (internalIterator, error) {
496 2 : var (
497 2 : hideObsoletePoints bool = false
498 2 : pointKeyFilters []BlockPropertyFilter
499 2 : filterer *sstable.BlockPropertiesFilterer
500 2 : )
501 2 : if opts != nil {
502 2 : // This code is appending (at most one filter) in-place to
503 2 : // opts.PointKeyFilters even though the slice is shared for iterators in
504 2 : // the same iterator tree. This is acceptable since all the following
505 2 : // properties are true:
506 2 : // - The iterator tree is single threaded, so the shared backing for the
507 2 : // slice is being mutated in a single threaded manner.
508 2 : // - Each shallow copy of the slice has its own notion of length.
509 2 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
510 2 : // struct, which is stateless, so overwriting that struct when creating
511 2 : // one sstable iterator is harmless to other sstable iterators that are
512 2 : // relying on that struct.
513 2 : //
514 2 : // An alternative would be to have different slices for different sstable
515 2 : // iterators, but that requires more work to avoid allocations.
516 2 : //
517 2 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
518 2 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
519 2 : // apply the obsolete block property filter. We could optimize this by
520 2 : // applying the filter.
521 2 : hideObsoletePoints, pointKeyFilters =
522 2 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
523 2 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
524 2 :
525 2 : var ok bool
526 2 : var err error
527 2 : ok, filterer, err = c.checkAndIntersectFilters(v, pointKeyFilters,
528 2 : internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
529 2 : if err != nil {
530 0 : return nil, err
531 2 : } else if !ok {
532 2 : // No point keys within the table match the filters.
533 2 : return nil, nil
534 2 : }
535 : }
536 :
537 2 : var iter sstable.Iterator
538 2 : filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
539 2 : if opts != nil {
540 2 : // By default, we don't use block filters for L6 and restrict the size for
541 2 : // flushable ingests, as these blocks can be very big.
542 2 : if !opts.UseL6Filters {
543 2 : if opts.layer == manifest.Level(6) {
544 2 : filterBlockSizeLimit = sstable.NeverUseFilterBlock
545 2 : } else if opts.layer.IsFlushableIngests() {
546 2 : filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
547 2 : }
548 : }
549 2 : if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
550 2 : ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
551 2 : }
552 : }
553 2 : tableFormat, err := v.reader.TableFormat()
554 2 : if err != nil {
555 0 : return nil, err
556 0 : }
557 :
558 2 : if v.isShared && file.SyntheticSeqNum() != 0 {
559 2 : if tableFormat < sstable.TableFormatPebblev4 {
560 0 : return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
561 0 : }
562 : // The table is shared and ingested.
563 2 : hideObsoletePoints = true
564 : }
565 2 : transforms := file.IterTransforms()
566 2 : transforms.HideObsoletePoints = hideObsoletePoints
567 2 : iterStatsAccum := internalOpts.iterStatsAccumulator
568 2 : if iterStatsAccum == nil && opts != nil && dbOpts.sstStatsCollector != nil {
569 2 : iterStatsAccum = dbOpts.sstStatsCollector.Accumulator(
570 2 : uint64(uintptr(unsafe.Pointer(v.reader))), opts.Category)
571 2 : }
572 2 : if internalOpts.compaction {
573 2 : iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, &v.readerProvider, internalOpts.bufferPool)
574 2 : } else {
575 2 : iter, err = cr.NewPointIter(
576 2 : ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit,
577 2 : internalOpts.stats, iterStatsAccum, &v.readerProvider)
578 2 : }
579 2 : if err != nil {
580 1 : return nil, err
581 1 : }
582 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
583 : // care to avoid introducing an allocation here by adding a closure.
584 2 : iter.SetCloseHook(v.closeHook)
585 2 : c.iterCount.Add(1)
586 2 : dbOpts.iterCount.Add(1)
587 2 : if invariants.RaceEnabled {
588 0 : c.mu.Lock()
589 0 : c.mu.iters[iter] = debug.Stack()
590 0 : c.mu.Unlock()
591 0 : }
592 2 : return iter, nil
593 : }
594 :
595 : // newRangeDelIter is an internal helper that constructs an iterator over a
596 : // sstable's range deletions. This function is for table-cache internal use
597 : // only, and callers should use newIters instead.
598 : func (c *fileCacheShard) newRangeDelIter(
599 : ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *fileCacheOpts,
600 2 : ) (keyspan.FragmentIterator, error) {
601 2 : // NB: range-del iterator does not maintain a reference to the table, nor
602 2 : // does it need to read from it after creation.
603 2 : rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms())
604 2 : if err != nil {
605 1 : return nil, err
606 1 : }
607 : // Assert expected bounds in tests.
608 2 : if invariants.Sometimes(50) && rangeDelIter != nil {
609 2 : cmp := base.DefaultComparer.Compare
610 2 : if dbOpts.readerOpts.Comparer != nil {
611 2 : cmp = dbOpts.readerOpts.Comparer.Compare
612 2 : }
613 2 : rangeDelIter = keyspan.AssertBounds(
614 2 : rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
615 2 : )
616 : }
617 2 : return rangeDelIter, nil
618 : }
619 :
620 : // newRangeKeyIter is an internal helper that constructs an iterator over a
621 : // sstable's range keys. This function is for table-cache internal use only, and
622 : // callers should use newIters instead.
623 : func (c *fileCacheShard) newRangeKeyIter(
624 : ctx context.Context,
625 : v *fileCacheValue,
626 : file *fileMetadata,
627 : cr sstable.CommonReader,
628 : opts keyspan.SpanIterOptions,
629 2 : ) (keyspan.FragmentIterator, error) {
630 2 : transforms := file.FragmentIterTransforms()
631 2 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
632 2 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
633 2 : // file's range key blocks may surface deleted range keys below. This is
634 2 : // done here, rather than deferring to the block-property collector in order
635 2 : // to maintain parity with point keys and the treatment of RANGEDELs.
636 2 : if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
637 0 : ok, _, err := c.checkAndIntersectFilters(v, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
638 0 : if err != nil {
639 0 : return nil, err
640 0 : } else if !ok {
641 0 : return nil, nil
642 0 : }
643 : }
644 : // TODO(radu): wrap in an AssertBounds.
645 2 : return cr.NewRawRangeKeyIter(ctx, transforms)
646 : }
647 :
648 : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
649 : // specific table.
650 : type tableCacheShardReaderProvider struct {
651 : c *fileCacheShard
652 : dbOpts *fileCacheOpts
653 : backingFileNum base.DiskFileNum
654 :
655 : mu struct {
656 : sync.Mutex
657 : // v is the result of findNode. Whenever it is not null, we hold a refcount
658 : // on the fileCacheValue.
659 : v *fileCacheValue
660 : // refCount is the number of GetReader() calls that have not received a
661 : // corresponding Close().
662 : refCount int
663 : }
664 : }
665 :
666 : var _ valblk.ReaderProvider = &tableCacheShardReaderProvider{}
667 :
668 : func (rp *tableCacheShardReaderProvider) init(
669 : c *fileCacheShard, dbOpts *fileCacheOpts, backingFileNum base.DiskFileNum,
670 2 : ) {
671 2 : rp.c = c
672 2 : rp.dbOpts = dbOpts
673 2 : rp.backingFileNum = backingFileNum
674 2 : rp.mu.v = nil
675 2 : rp.mu.refCount = 0
676 2 : }
677 :
678 : // GetReader implements sstable.ReaderProvider. Note that it is not the
679 : // responsibility of tableCacheShardReaderProvider to ensure that the file
680 : // continues to exist. The ReaderProvider is used in iterators where the
681 : // top-level iterator is pinning the read state and preventing the files from
682 : // being deleted.
683 : //
684 : // The caller must call tableCacheShardReaderProvider.Close.
685 : //
686 : // Note that currently the Reader returned here is only used to read value
687 : // blocks. This reader shouldn't be used for other purposes like reading keys
688 : // outside of virtual sstable bounds.
689 : //
690 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
691 : // that the reader isn't used for other purposes.
692 : func (rp *tableCacheShardReaderProvider) GetReader(
693 : ctx context.Context,
694 2 : ) (valblk.ExternalBlockReader, error) {
695 2 : rp.mu.Lock()
696 2 : defer rp.mu.Unlock()
697 2 :
698 2 : if rp.mu.v != nil {
699 0 : rp.mu.refCount++
700 0 : return rp.mu.v.reader, nil
701 0 : }
702 :
703 : // Calling findNodeInternal gives us the responsibility of decrementing v's
704 : // refCount. Note that if the table is no longer in the cache,
705 : // findNodeInternal will need to do IO to initialize a new Reader. We hold
706 : // rp.mu during this time so that concurrent GetReader calls block until the
707 : // Reader is created.
708 2 : v := rp.c.findNodeInternal(ctx, rp.backingFileNum, rp.dbOpts)
709 2 : if v.err != nil {
710 0 : defer rp.c.unrefValue(v)
711 0 : return nil, v.err
712 0 : }
713 2 : rp.mu.v = v
714 2 : rp.mu.refCount = 1
715 2 : return v.reader, nil
716 : }
717 :
718 : // Close implements sstable.ReaderProvider.
719 2 : func (rp *tableCacheShardReaderProvider) Close() {
720 2 : rp.mu.Lock()
721 2 : defer rp.mu.Unlock()
722 2 : rp.mu.refCount--
723 2 : if rp.mu.refCount <= 0 {
724 2 : if rp.mu.refCount < 0 {
725 0 : panic("pebble: sstable.ReaderProvider misuse")
726 : }
727 2 : rp.c.unrefValue(rp.mu.v)
728 2 : rp.mu.v = nil
729 : }
730 : }
731 :
732 : // getTableProperties return sst table properties for target file
733 : func (c *fileCacheShard) getTableProperties(
734 : file *fileMetadata, dbOpts *fileCacheOpts,
735 1 : ) (*sstable.Properties, error) {
736 1 : // Calling findNode gives us the responsibility of decrementing v's refCount here
737 1 : v := c.findNode(context.TODO(), file.FileBacking, dbOpts)
738 1 : defer c.unrefValue(v)
739 1 :
740 1 : if v.err != nil {
741 0 : return nil, v.err
742 0 : }
743 1 : return &v.reader.Properties, nil
744 : }
745 :
746 : // releaseNode releases a node from the fileCacheShard.
747 : //
748 : // c.mu must be held when calling this.
749 1 : func (c *fileCacheShard) releaseNode(n *fileCacheNode) {
750 1 : c.unlinkNode(n)
751 1 : c.clearNode(n)
752 1 : }
753 :
754 : // unlinkNode removes a node from the fileCacheShard, leaving the shard
755 : // reference in place.
756 : //
757 : // c.mu must be held when calling this.
758 2 : func (c *fileCacheShard) unlinkNode(n *fileCacheNode) {
759 2 : key := fileCacheKey{n.cacheID, n.fileNum}
760 2 : delete(c.mu.nodes, key)
761 2 :
762 2 : switch n.ptype {
763 2 : case fileCacheNodeHot:
764 2 : c.mu.sizeHot--
765 2 : case fileCacheNodeCold:
766 2 : c.mu.sizeCold--
767 2 : case fileCacheNodeTest:
768 2 : c.mu.sizeTest--
769 : }
770 :
771 2 : if n == c.mu.handHot {
772 2 : c.mu.handHot = c.mu.handHot.prev()
773 2 : }
774 2 : if n == c.mu.handCold {
775 2 : c.mu.handCold = c.mu.handCold.prev()
776 2 : }
777 2 : if n == c.mu.handTest {
778 2 : c.mu.handTest = c.mu.handTest.prev()
779 2 : }
780 :
781 2 : if n.unlink() == n {
782 2 : // This was the last entry in the cache.
783 2 : c.mu.handHot = nil
784 2 : c.mu.handCold = nil
785 2 : c.mu.handTest = nil
786 2 : }
787 :
788 2 : n.links.prev = nil
789 2 : n.links.next = nil
790 : }
791 :
792 2 : func (c *fileCacheShard) clearNode(n *fileCacheNode) {
793 2 : if v := n.value; v != nil {
794 2 : n.value = nil
795 2 : c.unrefValue(v)
796 2 : }
797 : }
798 :
799 : // unrefValue decrements the reference count for the specified value, releasing
800 : // it if the reference count fell to 0. Note that the value has a reference if
801 : // it is present in fileCacheShard.mu.nodes, so a reference count of 0 means the
802 : // node has already been removed from that map.
803 2 : func (c *fileCacheShard) unrefValue(v *fileCacheValue) {
804 2 : if v.refCount.Add(-1) == 0 {
805 2 : c.releasing.Add(1)
806 2 : c.releasingCh <- v
807 2 : }
808 : }
809 :
810 : // findNode returns the node for the table with the given file number, creating
811 : // that node if it didn't already exist. The caller is responsible for
812 : // decrementing the returned node's refCount.
813 : func (c *fileCacheShard) findNode(
814 : ctx context.Context, b *fileBacking, dbOpts *fileCacheOpts,
815 2 : ) *fileCacheValue {
816 2 : // The backing must have a positive refcount (otherwise it could be deleted at any time).
817 2 : b.MustHaveRefs()
818 2 : // Caution! Here fileMetadata can be a physical or virtual table. File cache
819 2 : // sstable readers are associated with the physical backings. All virtual
820 2 : // tables with the same backing will use the same reader from the cache; so
821 2 : // no information that can differ among these virtual tables can be passed
822 2 : // to findNodeInternal.
823 2 : backingFileNum := b.DiskFileNum
824 2 :
825 2 : return c.findNodeInternal(ctx, backingFileNum, dbOpts)
826 2 : }
827 :
828 : func (c *fileCacheShard) findNodeInternal(
829 : ctx context.Context, backingFileNum base.DiskFileNum, dbOpts *fileCacheOpts,
830 2 : ) *fileCacheValue {
831 2 : // Fast-path for a hit in the cache.
832 2 : c.mu.RLock()
833 2 : key := fileCacheKey{dbOpts.cacheID, backingFileNum}
834 2 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
835 2 : // Fast-path hit.
836 2 : //
837 2 : // The caller is responsible for decrementing the refCount.
838 2 : v := n.value
839 2 : v.refCount.Add(1)
840 2 : c.mu.RUnlock()
841 2 : n.referenced.Store(true)
842 2 : c.hits.Add(1)
843 2 : <-v.loaded
844 2 : return v
845 2 : }
846 2 : c.mu.RUnlock()
847 2 :
848 2 : c.mu.Lock()
849 2 :
850 2 : n := c.mu.nodes[key]
851 2 : switch {
852 2 : case n == nil:
853 2 : // Slow-path miss of a non-existent node.
854 2 : n = &fileCacheNode{
855 2 : fileNum: backingFileNum,
856 2 : ptype: fileCacheNodeCold,
857 2 : }
858 2 : c.addNode(n, dbOpts)
859 2 : c.mu.sizeCold++
860 :
861 2 : case n.value != nil:
862 2 : // Slow-path hit of a hot or cold node.
863 2 : //
864 2 : // The caller is responsible for decrementing the refCount.
865 2 : v := n.value
866 2 : v.refCount.Add(1)
867 2 : n.referenced.Store(true)
868 2 : c.hits.Add(1)
869 2 : c.mu.Unlock()
870 2 : <-v.loaded
871 2 : return v
872 :
873 2 : default:
874 2 : // Slow-path miss of a test node.
875 2 : c.unlinkNode(n)
876 2 : c.mu.coldTarget++
877 2 : if c.mu.coldTarget > c.size {
878 2 : c.mu.coldTarget = c.size
879 2 : }
880 :
881 2 : n.referenced.Store(false)
882 2 : n.ptype = fileCacheNodeHot
883 2 : c.addNode(n, dbOpts)
884 2 : c.mu.sizeHot++
885 : }
886 :
887 2 : c.misses.Add(1)
888 2 :
889 2 : v := &fileCacheValue{
890 2 : loaded: make(chan struct{}),
891 2 : }
892 2 : v.readerProvider.init(c, dbOpts, backingFileNum)
893 2 : v.refCount.Store(2)
894 2 : // Cache the closure invoked when an iterator is closed. This avoids an
895 2 : // allocation on every call to newIters.
896 2 : v.closeHook = func(i sstable.Iterator) error {
897 2 : if invariants.RaceEnabled {
898 0 : c.mu.Lock()
899 0 : delete(c.mu.iters, i)
900 0 : c.mu.Unlock()
901 0 : }
902 2 : c.unrefValue(v)
903 2 : c.iterCount.Add(-1)
904 2 : dbOpts.iterCount.Add(-1)
905 2 : return nil
906 : }
907 2 : n.value = v
908 2 :
909 2 : c.mu.Unlock()
910 2 :
911 2 : // Note adding to the cache lists must complete before we begin loading the
912 2 : // table as a failure during load will result in the node being unlinked.
913 2 : v.load(ctx, backingFileNum, c, dbOpts)
914 2 : return v
915 : }
916 :
917 2 : func (c *fileCacheShard) addNode(n *fileCacheNode, dbOpts *fileCacheOpts) {
918 2 : c.evictNodes()
919 2 : n.cacheID = dbOpts.cacheID
920 2 : key := fileCacheKey{n.cacheID, n.fileNum}
921 2 : c.mu.nodes[key] = n
922 2 :
923 2 : n.links.next = n
924 2 : n.links.prev = n
925 2 : if c.mu.handHot == nil {
926 2 : // First element.
927 2 : c.mu.handHot = n
928 2 : c.mu.handCold = n
929 2 : c.mu.handTest = n
930 2 : } else {
931 2 : c.mu.handHot.link(n)
932 2 : }
933 :
934 2 : if c.mu.handCold == c.mu.handHot {
935 2 : c.mu.handCold = c.mu.handCold.prev()
936 2 : }
937 : }
938 :
939 2 : func (c *fileCacheShard) evictNodes() {
940 2 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
941 2 : c.runHandCold()
942 2 : }
943 : }
944 :
945 2 : func (c *fileCacheShard) runHandCold() {
946 2 : n := c.mu.handCold
947 2 : if n.ptype == fileCacheNodeCold {
948 2 : if n.referenced.Load() {
949 2 : n.referenced.Store(false)
950 2 : n.ptype = fileCacheNodeHot
951 2 : c.mu.sizeCold--
952 2 : c.mu.sizeHot++
953 2 : } else {
954 2 : c.clearNode(n)
955 2 : n.ptype = fileCacheNodeTest
956 2 : c.mu.sizeCold--
957 2 : c.mu.sizeTest++
958 2 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
959 2 : c.runHandTest()
960 2 : }
961 : }
962 : }
963 :
964 2 : c.mu.handCold = c.mu.handCold.next()
965 2 :
966 2 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
967 2 : c.runHandHot()
968 2 : }
969 : }
970 :
971 2 : func (c *fileCacheShard) runHandHot() {
972 2 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
973 2 : c.runHandTest()
974 2 : if c.mu.handHot == nil {
975 0 : return
976 0 : }
977 : }
978 :
979 2 : n := c.mu.handHot
980 2 : if n.ptype == fileCacheNodeHot {
981 2 : if n.referenced.Load() {
982 2 : n.referenced.Store(false)
983 2 : } else {
984 2 : n.ptype = fileCacheNodeCold
985 2 : c.mu.sizeHot--
986 2 : c.mu.sizeCold++
987 2 : }
988 : }
989 :
990 2 : c.mu.handHot = c.mu.handHot.next()
991 : }
992 :
993 2 : func (c *fileCacheShard) runHandTest() {
994 2 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
995 2 : c.runHandCold()
996 2 : if c.mu.handTest == nil {
997 0 : return
998 0 : }
999 : }
1000 :
1001 2 : n := c.mu.handTest
1002 2 : if n.ptype == fileCacheNodeTest {
1003 2 : c.mu.coldTarget--
1004 2 : if c.mu.coldTarget < 0 {
1005 2 : c.mu.coldTarget = 0
1006 2 : }
1007 2 : c.unlinkNode(n)
1008 2 : c.clearNode(n)
1009 : }
1010 :
1011 2 : c.mu.handTest = c.mu.handTest.next()
1012 : }
1013 :
1014 2 : func (c *fileCacheShard) evict(fileNum base.DiskFileNum, dbOpts *fileCacheOpts, allowLeak bool) {
1015 2 : c.mu.Lock()
1016 2 : key := fileCacheKey{dbOpts.cacheID, fileNum}
1017 2 : n := c.mu.nodes[key]
1018 2 : var v *fileCacheValue
1019 2 : if n != nil {
1020 2 : // NB: This is equivalent to fileCacheShard.releaseNode(), but we
1021 2 : // perform the fileCacheShard.release() call synchronously below to
1022 2 : // ensure the sstable file descriptor is closed before returning. Note
1023 2 : // that fileCacheShard.releasing needs to be incremented while holding
1024 2 : // fileCacheShard.mu in order to avoid a race with Close()
1025 2 : c.unlinkNode(n)
1026 2 : v = n.value
1027 2 : if v != nil {
1028 2 : if !allowLeak {
1029 2 : if t := v.refCount.Add(-1); t != 0 {
1030 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
1031 0 : }
1032 : }
1033 2 : c.releasing.Add(1)
1034 : }
1035 : }
1036 :
1037 2 : c.mu.Unlock()
1038 2 :
1039 2 : if v != nil {
1040 2 : v.release(c)
1041 2 : }
1042 :
1043 2 : dbOpts.cache.EvictFile(dbOpts.cacheID, fileNum)
1044 : }
1045 :
1046 : // removeDB evicts any nodes which have a reference to the DB
1047 : // associated with dbOpts.cacheID. Make sure that there will
1048 : // be no more accesses to the files associated with the DB.
1049 2 : func (c *fileCacheShard) removeDB(dbOpts *fileCacheOpts) {
1050 2 : var fileNums []base.DiskFileNum
1051 2 :
1052 2 : c.mu.RLock()
1053 2 : // Collect the fileNums which need to be cleaned.
1054 2 : var firstNode *fileCacheNode
1055 2 : node := c.mu.handHot
1056 2 : for node != firstNode {
1057 2 : if firstNode == nil {
1058 2 : firstNode = node
1059 2 : }
1060 :
1061 2 : if node.cacheID == dbOpts.cacheID {
1062 2 : fileNums = append(fileNums, node.fileNum)
1063 2 : }
1064 2 : node = node.next()
1065 : }
1066 2 : c.mu.RUnlock()
1067 2 :
1068 2 : // Evict all the nodes associated with the DB.
1069 2 : // This should synchronously close all the files
1070 2 : // associated with the DB.
1071 2 : for _, fileNum := range fileNums {
1072 2 : c.evict(fileNum, dbOpts, true)
1073 2 : }
1074 : }
1075 :
1076 2 : func (c *fileCacheShard) Close() error {
1077 2 : c.mu.Lock()
1078 2 : defer c.mu.Unlock()
1079 2 :
1080 2 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1081 2 : // the case that there are leaked iterators.
1082 2 : var err error
1083 2 : if v := c.iterCount.Load(); v > 0 {
1084 1 : if !invariants.RaceEnabled {
1085 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1086 1 : } else {
1087 0 : var buf bytes.Buffer
1088 0 : for _, stack := range c.mu.iters {
1089 0 : fmt.Fprintf(&buf, "%s\n", stack)
1090 0 : }
1091 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1092 : }
1093 : }
1094 :
1095 2 : for c.mu.handHot != nil {
1096 0 : n := c.mu.handHot
1097 0 : if n.value != nil {
1098 0 : if n.value.refCount.Add(-1) == 0 {
1099 0 : c.releasing.Add(1)
1100 0 : c.releasingCh <- n.value
1101 0 : }
1102 : }
1103 0 : c.unlinkNode(n)
1104 : }
1105 2 : c.mu.nodes = nil
1106 2 : c.mu.handHot = nil
1107 2 : c.mu.handCold = nil
1108 2 : c.mu.handTest = nil
1109 2 :
1110 2 : // Only shutdown the releasing goroutine if there were no leaked
1111 2 : // iterators. If there were leaked iterators, we leave the goroutine running
1112 2 : // and the releasingCh open so that a subsequent iterator close can
1113 2 : // complete. This behavior is used by iterator leak tests. Leaking the
1114 2 : // goroutine for these tests is less bad not closing the iterator which
1115 2 : // triggers other warnings about block cache handles not being released.
1116 2 : if err != nil {
1117 1 : c.releasing.Wait()
1118 1 : return err
1119 1 : }
1120 :
1121 2 : close(c.releasingCh)
1122 2 : c.releasing.Wait()
1123 2 : c.releaseLoopExit.Wait()
1124 2 : return err
1125 : }
1126 :
1127 : type fileCacheValue struct {
1128 : closeHook func(i sstable.Iterator) error
1129 : reader *sstable.Reader
1130 : err error
1131 : loaded chan struct{}
1132 : // Reference count for the value. The reader is closed when the reference
1133 : // count drops to zero.
1134 : refCount atomic.Int32
1135 : isShared bool
1136 :
1137 : // readerProvider is embedded here so that we only allocate it once as long as
1138 : // the table stays in the cache. Its state is not always logically tied to
1139 : // this specific fileCacheShard - if a table goes out of the cache and then
1140 : // comes back in, the readerProvider in a now-defunct fileCacheValue can
1141 : // still be used and will internally refer to the new fileCacheValue.
1142 : readerProvider tableCacheShardReaderProvider
1143 : }
1144 :
1145 : func (v *fileCacheValue) load(
1146 : ctx context.Context, backingFileNum base.DiskFileNum, c *fileCacheShard, dbOpts *fileCacheOpts,
1147 2 : ) {
1148 2 : // Try opening the file first.
1149 2 : var f objstorage.Readable
1150 2 : var err error
1151 2 : f, err = dbOpts.objProvider.OpenForReading(
1152 2 : ctx, fileTypeTable, backingFileNum, objstorage.OpenOptions{MustExist: true},
1153 2 : )
1154 2 : if err == nil {
1155 2 : o := dbOpts.readerOpts
1156 2 : o.SetInternalCacheOpts(sstableinternal.CacheOptions{
1157 2 : Cache: dbOpts.cache,
1158 2 : CacheID: dbOpts.cacheID,
1159 2 : FileNum: backingFileNum,
1160 2 : })
1161 2 : v.reader, err = sstable.NewReader(ctx, f, o)
1162 2 : }
1163 2 : if err == nil {
1164 2 : var objMeta objstorage.ObjectMetadata
1165 2 : objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, backingFileNum)
1166 2 : v.isShared = objMeta.IsShared()
1167 2 : }
1168 2 : if err != nil {
1169 1 : v.err = errors.Wrapf(
1170 1 : err, "pebble: backing file %s error", backingFileNum)
1171 1 : }
1172 2 : if v.err != nil {
1173 1 : c.mu.Lock()
1174 1 : defer c.mu.Unlock()
1175 1 : // Lookup the node in the cache again as it might have already been
1176 1 : // removed.
1177 1 : key := fileCacheKey{dbOpts.cacheID, backingFileNum}
1178 1 : n := c.mu.nodes[key]
1179 1 : if n != nil && n.value == v {
1180 1 : c.releaseNode(n)
1181 1 : }
1182 : }
1183 2 : close(v.loaded)
1184 : }
1185 :
1186 2 : func (v *fileCacheValue) release(c *fileCacheShard) {
1187 2 : <-v.loaded
1188 2 : // Nothing to be done about an error at this point. Close the reader if it is
1189 2 : // open.
1190 2 : if v.reader != nil {
1191 2 : _ = v.reader.Close()
1192 2 : }
1193 2 : c.releasing.Done()
1194 : }
1195 :
1196 : type fileCacheNodeType int8
1197 :
1198 : const (
1199 : fileCacheNodeTest fileCacheNodeType = iota
1200 : fileCacheNodeCold
1201 : fileCacheNodeHot
1202 : )
1203 :
1204 0 : func (p fileCacheNodeType) String() string {
1205 0 : switch p {
1206 0 : case fileCacheNodeTest:
1207 0 : return "test"
1208 0 : case fileCacheNodeCold:
1209 0 : return "cold"
1210 0 : case fileCacheNodeHot:
1211 0 : return "hot"
1212 : }
1213 0 : return "unknown"
1214 : }
1215 :
1216 : type fileCacheNode struct {
1217 : fileNum base.DiskFileNum
1218 : value *fileCacheValue
1219 :
1220 : links struct {
1221 : next *fileCacheNode
1222 : prev *fileCacheNode
1223 : }
1224 : ptype fileCacheNodeType
1225 : // referenced is atomically set to indicate that this entry has been accessed
1226 : // since the last time one of the clock hands swept it.
1227 : referenced atomic.Bool
1228 :
1229 : // Storing the cache id associated with the DB instance here
1230 : // avoids the need to thread the dbOpts struct through many functions.
1231 : cacheID cache.ID
1232 : }
1233 :
1234 2 : func (n *fileCacheNode) next() *fileCacheNode {
1235 2 : if n == nil {
1236 0 : return nil
1237 0 : }
1238 2 : return n.links.next
1239 : }
1240 :
1241 2 : func (n *fileCacheNode) prev() *fileCacheNode {
1242 2 : if n == nil {
1243 0 : return nil
1244 0 : }
1245 2 : return n.links.prev
1246 : }
1247 :
1248 2 : func (n *fileCacheNode) link(s *fileCacheNode) {
1249 2 : s.links.prev = n.links.prev
1250 2 : s.links.prev.links.next = s
1251 2 : s.links.next = n
1252 2 : s.links.next.links.prev = s
1253 2 : }
1254 :
1255 2 : func (n *fileCacheNode) unlink() *fileCacheNode {
1256 2 : next := n.links.next
1257 2 : n.links.prev.links.next = n.links.next
1258 2 : n.links.next.links.prev = n.links.prev
1259 2 : n.links.prev = n
1260 2 : n.links.next = n
1261 2 : return next
1262 2 : }
1263 :
1264 : // iterSet holds a set of iterators of various key kinds, all constructed over
1265 : // the same data structure (eg, an sstable). A subset of the fields may be
1266 : // populated depending on the `iterKinds` passed to newIters.
1267 : type iterSet struct {
1268 : point internalIterator
1269 : rangeDeletion keyspan.FragmentIterator
1270 : rangeKey keyspan.FragmentIterator
1271 : }
1272 :
1273 : // TODO(jackson): Consider adding methods for fast paths that check whether an
1274 : // iterator of a particular kind is nil, so that these call sites don't need to
1275 : // reach into the struct's fields directly.
1276 :
1277 : // Point returns the contained point iterator. If there is no point iterator,
1278 : // Point returns a non-nil empty point iterator.
1279 2 : func (s *iterSet) Point() internalIterator {
1280 2 : if s.point == nil {
1281 2 : return emptyIter
1282 2 : }
1283 2 : return s.point
1284 : }
1285 :
1286 : // RangeDeletion returns the contained range deletion iterator. If there is no
1287 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
1288 : // iterator.
1289 2 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
1290 2 : if s.rangeDeletion == nil {
1291 2 : return emptyKeyspanIter
1292 2 : }
1293 2 : return s.rangeDeletion
1294 : }
1295 :
1296 : // RangeKey returns the contained range key iterator. If there is no range key
1297 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
1298 2 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
1299 2 : if s.rangeKey == nil {
1300 0 : return emptyKeyspanIter
1301 0 : }
1302 2 : return s.rangeKey
1303 : }
1304 :
1305 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
1306 : // must be not be called on the constituent iterators.
1307 2 : func (s *iterSet) CloseAll() error {
1308 2 : var err error
1309 2 : if s.point != nil {
1310 2 : err = s.point.Close()
1311 2 : s.point = nil
1312 2 : }
1313 2 : if s.rangeDeletion != nil {
1314 2 : s.rangeDeletion.Close()
1315 2 : s.rangeDeletion = nil
1316 2 : }
1317 2 : if s.rangeKey != nil {
1318 2 : s.rangeKey.Close()
1319 2 : s.rangeKey = nil
1320 2 : }
1321 2 : return err
1322 : }
1323 :
1324 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
1325 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
1326 : // represent a set of desired iterator kinds.
1327 : type iterKinds uint8
1328 :
1329 2 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
1330 2 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
1331 2 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
1332 :
1333 : const (
1334 : iterPointKeys iterKinds = 1 << iota
1335 : iterRangeDeletions
1336 : iterRangeKeys
1337 : )
|