Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/cache"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/sstableinternal"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
28 : "github.com/cockroachdb/pebble/sstable"
29 : )
30 :
31 : var emptyIter = &errorIter{err: nil}
32 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
33 :
34 : // tableNewIters creates new iterators (point, range deletion and/or range key)
35 : // for the given file metadata. Which of the various iterator kinds the user is
36 : // requesting is specified with the iterKinds bitmap.
37 : //
38 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
39 : // populated with iterators.
40 : //
41 : // If a point iterator is requested and the operation was successful,
42 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
43 : // finished.
44 : //
45 : // If a range deletion or range key iterator is requested, the corresponding
46 : // iterator may be nil if the table does not contain any keys of the
47 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
48 : // RangeKey() convenience methods that return non-nil empty iterators that may
49 : // be used if the caller requires a non-nil iterator.
50 : //
51 : // On error, all iterators are nil.
52 : //
53 : // The only (non-test) implementation of tableNewIters is
54 : // tableCacheContainer.newIters().
55 : type tableNewIters func(
56 : ctx context.Context,
57 : file *manifest.FileMetadata,
58 : opts *IterOptions,
59 : internalOpts internalIterOpts,
60 : kinds iterKinds,
61 : ) (iterSet, error)
62 :
63 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
64 : // for the rangedel iterator returned by tableNewIters.
65 1 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
66 1 : return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
67 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
68 1 : if err != nil {
69 0 : return nil, err
70 0 : }
71 1 : return iters.RangeDeletion(), nil
72 : }
73 : }
74 :
75 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
76 : // for the range key iterator returned by tableNewIters.
77 1 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
78 1 : return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
79 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
80 1 : if err != nil {
81 1 : return nil, err
82 1 : }
83 1 : return iters.RangeKey(), nil
84 : }
85 : }
86 :
87 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
88 :
89 : // tableCacheOpts contains the db specific fields
90 : // of a table cache. This is stored in the tableCacheContainer
91 : // along with the table cache.
92 : // NB: It is important to make sure that the fields in this
93 : // struct are read-only. Since the fields here are shared
94 : // by every single tableCacheShard, if non read-only fields
95 : // are updated, we could have unnecessary evictions of those
96 : // fields, and the surrounding fields from the CPU caches.
97 : type tableCacheOpts struct {
98 : // iterCount keeps track of how many iterators are open. It is used to keep
99 : // track of leaked iterators on a per-db level.
100 : iterCount *atomic.Int32
101 :
102 : loggerAndTracer LoggerAndTracer
103 : cache *cache.Cache
104 : cacheID uint64
105 : objProvider objstorage.Provider
106 : readerOpts sstable.ReaderOptions
107 : sstStatsCollector *sstable.CategoryStatsCollector
108 : }
109 :
110 : // tableCacheContainer contains the table cache and
111 : // fields which are unique to the DB.
112 : type tableCacheContainer struct {
113 : tableCache *TableCache
114 :
115 : // dbOpts contains fields relevant to the table cache
116 : // which are unique to each DB.
117 : dbOpts tableCacheOpts
118 : }
119 :
120 : // newTableCacheContainer will panic if the underlying cache in the table cache
121 : // doesn't match Options.Cache.
122 : func newTableCacheContainer(
123 : tc *TableCache,
124 : cacheID uint64,
125 : objProvider objstorage.Provider,
126 : opts *Options,
127 : size int,
128 : sstStatsCollector *sstable.CategoryStatsCollector,
129 1 : ) *tableCacheContainer {
130 1 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
131 1 : if tc != nil {
132 1 : if tc.cache != opts.Cache {
133 0 : panic("pebble: underlying cache for the table cache and db are different")
134 : }
135 1 : tc.Ref()
136 1 : } else {
137 1 : // NewTableCache should create a ref to tc which the container should
138 1 : // drop whenever it is closed.
139 1 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
140 1 : }
141 :
142 1 : t := &tableCacheContainer{}
143 1 : t.tableCache = tc
144 1 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
145 1 : t.dbOpts.cache = opts.Cache
146 1 : t.dbOpts.cacheID = cacheID
147 1 : t.dbOpts.objProvider = objProvider
148 1 : t.dbOpts.readerOpts = opts.MakeReaderOptions()
149 1 : t.dbOpts.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
150 1 : t.dbOpts.iterCount = new(atomic.Int32)
151 1 : t.dbOpts.sstStatsCollector = sstStatsCollector
152 1 : return t
153 : }
154 :
155 : // Before calling close, make sure that there will be no further need
156 : // to access any of the files associated with the store.
157 1 : func (c *tableCacheContainer) close() error {
158 1 : // We want to do some cleanup work here. Check for leaked iterators
159 1 : // by the DB using this container. Note that we'll still perform cleanup
160 1 : // below in the case that there are leaked iterators.
161 1 : var err error
162 1 : if v := c.dbOpts.iterCount.Load(); v > 0 {
163 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
164 1 : }
165 :
166 : // Release nodes here.
167 1 : for _, shard := range c.tableCache.shards {
168 1 : if shard != nil {
169 1 : shard.removeDB(&c.dbOpts)
170 1 : }
171 : }
172 1 : return firstError(err, c.tableCache.Unref())
173 : }
174 :
175 : func (c *tableCacheContainer) newIters(
176 : ctx context.Context,
177 : file *manifest.FileMetadata,
178 : opts *IterOptions,
179 : internalOpts internalIterOpts,
180 : kinds iterKinds,
181 1 : ) (iterSet, error) {
182 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
183 1 : }
184 :
185 : // getTableProperties returns the properties associated with the backing physical
186 : // table if the input metadata belongs to a virtual sstable.
187 1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
188 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
189 1 : }
190 :
191 1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
192 1 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
193 1 : }
194 :
195 1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
196 1 : var m CacheMetrics
197 1 : for i := range c.tableCache.shards {
198 1 : s := c.tableCache.shards[i]
199 1 : s.mu.RLock()
200 1 : m.Count += int64(len(s.mu.nodes))
201 1 : s.mu.RUnlock()
202 1 : m.Hits += s.hits.Load()
203 1 : m.Misses += s.misses.Load()
204 1 : }
205 1 : m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
206 1 : f := c.dbOpts.readerOpts.FilterMetricsTracker.Load()
207 1 : return m, f
208 : }
209 :
210 : func (c *tableCacheContainer) estimateSize(
211 : meta *fileMetadata, lower, upper []byte,
212 1 : ) (size uint64, err error) {
213 1 : if meta.Virtual {
214 1 : err = c.withVirtualReader(
215 1 : meta.VirtualMeta(),
216 1 : func(r sstable.VirtualReader) (err error) {
217 1 : size, err = r.EstimateDiskUsage(lower, upper)
218 1 : return err
219 1 : },
220 : )
221 1 : } else {
222 1 : err = c.withReader(
223 1 : meta.PhysicalMeta(),
224 1 : func(r *sstable.Reader) (err error) {
225 1 : size, err = r.EstimateDiskUsage(lower, upper)
226 1 : return err
227 1 : },
228 : )
229 : }
230 1 : if err != nil {
231 0 : return 0, err
232 0 : }
233 1 : return size, nil
234 : }
235 :
236 : // createCommonReader creates a Reader for this file.
237 1 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
238 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
239 1 : var cr sstable.CommonReader = v.reader
240 1 : if file.Virtual {
241 1 : virtualReader := sstable.MakeVirtualReader(
242 1 : v.reader, file.VirtualMeta().VirtualReaderParams(v.isShared),
243 1 : )
244 1 : cr = &virtualReader
245 1 : }
246 1 : return cr
247 : }
248 :
249 : func (c *tableCacheContainer) withCommonReader(
250 : meta *fileMetadata, fn func(sstable.CommonReader) error,
251 1 : ) error {
252 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
253 1 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
254 1 : defer s.unrefValue(v)
255 1 : if v.err != nil {
256 0 : return v.err
257 0 : }
258 1 : return fn(createCommonReader(v, meta))
259 : }
260 :
261 1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
262 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
263 1 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
264 1 : defer s.unrefValue(v)
265 1 : if v.err != nil {
266 1 : return v.err
267 1 : }
268 1 : return fn(v.reader)
269 : }
270 :
271 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
272 : func (c *tableCacheContainer) withVirtualReader(
273 : meta virtualMeta, fn func(sstable.VirtualReader) error,
274 1 : ) error {
275 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
276 1 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
277 1 : defer s.unrefValue(v)
278 1 : if v.err != nil {
279 0 : return v.err
280 0 : }
281 1 : provider := c.dbOpts.objProvider
282 1 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
283 1 : if err != nil {
284 0 : return err
285 0 : }
286 1 : return fn(sstable.MakeVirtualReader(v.reader, meta.VirtualReaderParams(objMeta.IsShared())))
287 : }
288 :
289 1 : func (c *tableCacheContainer) iterCount() int64 {
290 1 : return int64(c.dbOpts.iterCount.Load())
291 1 : }
292 :
293 : // TableCache is a shareable cache for open sstables.
294 : type TableCache struct {
295 : refs atomic.Int64
296 :
297 : cache *Cache
298 : shards []*tableCacheShard
299 : }
300 :
301 : // Ref adds a reference to the table cache. Once tableCache.init returns,
302 : // the table cache only remains valid if there is at least one reference
303 : // to it.
304 1 : func (c *TableCache) Ref() {
305 1 : v := c.refs.Add(1)
306 1 : // We don't want the reference count to ever go from 0 -> 1,
307 1 : // cause a reference count of 0 implies that we've closed the cache.
308 1 : if v <= 1 {
309 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
310 : }
311 : }
312 :
313 : // Unref removes a reference to the table cache.
314 1 : func (c *TableCache) Unref() error {
315 1 : v := c.refs.Add(-1)
316 1 : switch {
317 1 : case v < 0:
318 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
319 1 : case v == 0:
320 1 : var err error
321 1 : for i := range c.shards {
322 1 : // The cache shard is not allocated yet, nothing to close
323 1 : if c.shards[i] == nil {
324 0 : continue
325 : }
326 1 : err = firstError(err, c.shards[i].Close())
327 : }
328 :
329 : // Unref the cache which we create a reference to when the tableCache
330 : // is first instantiated.
331 1 : c.cache.Unref()
332 1 : return err
333 : }
334 1 : return nil
335 : }
336 :
337 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
338 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
339 1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
340 1 : if size == 0 {
341 0 : panic("pebble: cannot create a table cache of size 0")
342 1 : } else if numShards == 0 {
343 0 : panic("pebble: cannot create a table cache with 0 shards")
344 : }
345 :
346 1 : c := &TableCache{}
347 1 : c.cache = cache
348 1 : c.cache.Ref()
349 1 :
350 1 : c.shards = make([]*tableCacheShard, numShards)
351 1 : for i := range c.shards {
352 1 : c.shards[i] = &tableCacheShard{}
353 1 : c.shards[i].init(size / len(c.shards))
354 1 : }
355 :
356 : // Hold a ref to the cache here.
357 1 : c.refs.Store(1)
358 1 :
359 1 : return c
360 : }
361 :
362 1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
363 1 : return c.shards[uint64(fileNum)%uint64(len(c.shards))]
364 1 : }
365 :
366 : type tableCacheKey struct {
367 : cacheID uint64
368 : fileNum base.DiskFileNum
369 : }
370 :
371 : type tableCacheShard struct {
372 : hits atomic.Int64
373 : misses atomic.Int64
374 : iterCount atomic.Int32
375 :
376 : size int
377 :
378 : mu struct {
379 : sync.RWMutex
380 : nodes map[tableCacheKey]*tableCacheNode
381 : // The iters map is only created and populated in race builds.
382 : iters map[io.Closer][]byte
383 :
384 : handHot *tableCacheNode
385 : handCold *tableCacheNode
386 : handTest *tableCacheNode
387 :
388 : coldTarget int
389 : sizeHot int
390 : sizeCold int
391 : sizeTest int
392 : }
393 : releasing sync.WaitGroup
394 : releasingCh chan *tableCacheValue
395 : releaseLoopExit sync.WaitGroup
396 : }
397 :
398 1 : func (c *tableCacheShard) init(size int) {
399 1 : c.size = size
400 1 :
401 1 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
402 1 : c.mu.coldTarget = size
403 1 : c.releasingCh = make(chan *tableCacheValue, 100)
404 1 : c.releaseLoopExit.Add(1)
405 1 : go c.releaseLoop()
406 1 :
407 1 : if invariants.RaceEnabled {
408 0 : c.mu.iters = make(map[io.Closer][]byte)
409 0 : }
410 : }
411 :
412 1 : func (c *tableCacheShard) releaseLoop() {
413 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
414 1 : defer c.releaseLoopExit.Done()
415 1 : for v := range c.releasingCh {
416 1 : v.release(c)
417 1 : }
418 : })
419 : }
420 :
421 : // checkAndIntersectFilters checks the specific table and block property filters
422 : // for intersection with any available table and block-level properties. Returns
423 : // true for ok if this table should be read by this iterator.
424 : func (c *tableCacheShard) checkAndIntersectFilters(
425 : v *tableCacheValue,
426 : blockPropertyFilters []BlockPropertyFilter,
427 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
428 : syntheticSuffix sstable.SyntheticSuffix,
429 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
430 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
431 1 : filterer, err = sstable.IntersectsTable(
432 1 : blockPropertyFilters,
433 1 : boundLimitedFilter,
434 1 : v.reader.Properties.UserProperties,
435 1 : syntheticSuffix,
436 1 : )
437 1 : // NB: IntersectsTable will return a nil filterer if the table-level
438 1 : // properties indicate there's no intersection with the provided filters.
439 1 : if filterer == nil || err != nil {
440 1 : return false, nil, err
441 1 : }
442 : }
443 1 : return true, filterer, nil
444 : }
445 :
446 : func (c *tableCacheShard) newIters(
447 : ctx context.Context,
448 : file *manifest.FileMetadata,
449 : opts *IterOptions,
450 : internalOpts internalIterOpts,
451 : dbOpts *tableCacheOpts,
452 : kinds iterKinds,
453 1 : ) (iterSet, error) {
454 1 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
455 1 : // since parts of the sstable are read during the construction. The Reader
456 1 : // should not remember that context since the Reader can be long-lived.
457 1 :
458 1 : // Calling findNode gives us the responsibility of decrementing v's
459 1 : // refCount. If opening the underlying table resulted in error, then we
460 1 : // decrement this straight away. Otherwise, we pass that responsibility to
461 1 : // the sstable iterator, which decrements when it is closed.
462 1 : v := c.findNode(ctx, file.FileBacking, dbOpts)
463 1 : if v.err != nil {
464 1 : defer c.unrefValue(v)
465 1 : return iterSet{}, v.err
466 1 : }
467 :
468 : // Note: This suffers an allocation for virtual sstables.
469 1 : cr := createCommonReader(v, file)
470 1 : var iters iterSet
471 1 : var err error
472 1 : if kinds.RangeKey() && file.HasRangeKeys {
473 1 : iters.rangeKey, err = c.newRangeKeyIter(ctx, v, file, cr, opts.SpanIterOptions())
474 1 : }
475 1 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
476 1 : iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
477 1 : }
478 1 : if kinds.Point() && err == nil {
479 1 : iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
480 1 : }
481 1 : if err != nil {
482 1 : // NB: There's a subtlety here: Because the point iterator is the last
483 1 : // iterator we attempt to create, it's not possible for:
484 1 : // err != nil && iters.point != nil
485 1 : // If it were possible, we'd need to account for it to avoid double
486 1 : // unref-ing here, once during CloseAll and once during `unrefValue`.
487 1 : iters.CloseAll()
488 1 : c.unrefValue(v)
489 1 : return iterSet{}, err
490 1 : }
491 : // Only point iterators ever require the reader stay pinned in the cache. If
492 : // we're not returning a point iterator to the caller, we need to unref v.
493 1 : if iters.point == nil {
494 1 : c.unrefValue(v)
495 1 : }
496 1 : return iters, nil
497 : }
498 :
499 : // For flushable ingests, we decide whether to use the bloom filter base on
500 : // size.
501 : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
502 :
503 : // newPointIter is an internal helper that constructs a point iterator over a
504 : // sstable. This function is for internal use only, and callers should use
505 : // newIters instead.
506 : func (c *tableCacheShard) newPointIter(
507 : ctx context.Context,
508 : v *tableCacheValue,
509 : file *manifest.FileMetadata,
510 : cr sstable.CommonReader,
511 : opts *IterOptions,
512 : internalOpts internalIterOpts,
513 : dbOpts *tableCacheOpts,
514 1 : ) (internalIterator, error) {
515 1 : var (
516 1 : hideObsoletePoints bool = false
517 1 : pointKeyFilters []BlockPropertyFilter
518 1 : filterer *sstable.BlockPropertiesFilterer
519 1 : )
520 1 : if opts != nil {
521 1 : // This code is appending (at most one filter) in-place to
522 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
523 1 : // the same iterator tree. This is acceptable since all the following
524 1 : // properties are true:
525 1 : // - The iterator tree is single threaded, so the shared backing for the
526 1 : // slice is being mutated in a single threaded manner.
527 1 : // - Each shallow copy of the slice has its own notion of length.
528 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
529 1 : // struct, which is stateless, so overwriting that struct when creating
530 1 : // one sstable iterator is harmless to other sstable iterators that are
531 1 : // relying on that struct.
532 1 : //
533 1 : // An alternative would be to have different slices for different sstable
534 1 : // iterators, but that requires more work to avoid allocations.
535 1 : //
536 1 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
537 1 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
538 1 : // apply the obsolete block property filter. We could optimize this by
539 1 : // applying the filter.
540 1 : hideObsoletePoints, pointKeyFilters =
541 1 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
542 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
543 1 :
544 1 : var ok bool
545 1 : var err error
546 1 : ok, filterer, err = c.checkAndIntersectFilters(v, pointKeyFilters,
547 1 : internalOpts.boundLimitedFilter, file.SyntheticSuffix)
548 1 : if err != nil {
549 0 : return nil, err
550 1 : } else if !ok {
551 1 : // No point keys within the table match the filters.
552 1 : return nil, nil
553 1 : }
554 : }
555 :
556 1 : var iter sstable.Iterator
557 1 : filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
558 1 : if opts != nil {
559 1 : // By default, we don't use block filters for L6 and restrict the size for
560 1 : // flushable ingests, as these blocks can be very big.
561 1 : if !opts.UseL6Filters {
562 1 : if opts.layer == manifest.Level(6) {
563 1 : filterBlockSizeLimit = sstable.NeverUseFilterBlock
564 1 : } else if opts.layer.IsFlushableIngests() {
565 1 : filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
566 1 : }
567 : }
568 1 : if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
569 1 : ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
570 1 : }
571 : }
572 1 : tableFormat, err := v.reader.TableFormat()
573 1 : if err != nil {
574 0 : return nil, err
575 0 : }
576 1 : var rp sstable.ReaderProvider
577 1 : if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
578 1 : rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
579 1 : }
580 :
581 1 : if v.isShared && file.SyntheticSeqNum() != 0 {
582 1 : if tableFormat < sstable.TableFormatPebblev4 {
583 0 : return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
584 0 : }
585 : // The table is shared and ingested.
586 1 : hideObsoletePoints = true
587 : }
588 1 : transforms := file.IterTransforms()
589 1 : transforms.HideObsoletePoints = hideObsoletePoints
590 1 : var categoryAndQoS sstable.CategoryAndQoS
591 1 : if opts != nil {
592 1 : categoryAndQoS = opts.CategoryAndQoS
593 1 : }
594 1 : if internalOpts.compaction {
595 1 : iter, err = cr.NewCompactionIter(
596 1 : transforms, categoryAndQoS, dbOpts.sstStatsCollector, rp,
597 1 : internalOpts.bufferPool)
598 1 : } else {
599 1 : iter, err = cr.NewPointIter(
600 1 : ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit,
601 1 : internalOpts.stats, categoryAndQoS, dbOpts.sstStatsCollector, rp)
602 1 : }
603 1 : if err != nil {
604 1 : return nil, err
605 1 : }
606 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
607 : // care to avoid introducing an allocation here by adding a closure.
608 1 : iter.SetCloseHook(v.closeHook)
609 1 : c.iterCount.Add(1)
610 1 : dbOpts.iterCount.Add(1)
611 1 : if invariants.RaceEnabled {
612 0 : c.mu.Lock()
613 0 : c.mu.iters[iter] = debug.Stack()
614 0 : c.mu.Unlock()
615 0 : }
616 1 : return iter, nil
617 : }
618 :
619 : // newRangeDelIter is an internal helper that constructs an iterator over a
620 : // sstable's range deletions. This function is for table-cache internal use
621 : // only, and callers should use newIters instead.
622 : func (c *tableCacheShard) newRangeDelIter(
623 : ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *tableCacheOpts,
624 1 : ) (keyspan.FragmentIterator, error) {
625 1 : // NB: range-del iterator does not maintain a reference to the table, nor
626 1 : // does it need to read from it after creation.
627 1 : rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms())
628 1 : if err != nil {
629 1 : return nil, err
630 1 : }
631 : // Assert expected bounds in tests.
632 1 : if invariants.Sometimes(50) && rangeDelIter != nil {
633 1 : cmp := base.DefaultComparer.Compare
634 1 : if dbOpts.readerOpts.Comparer != nil {
635 1 : cmp = dbOpts.readerOpts.Comparer.Compare
636 1 : }
637 1 : rangeDelIter = keyspan.AssertBounds(
638 1 : rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
639 1 : )
640 : }
641 1 : return rangeDelIter, nil
642 : }
643 :
644 : // newRangeKeyIter is an internal helper that constructs an iterator over a
645 : // sstable's range keys. This function is for table-cache internal use only, and
646 : // callers should use newIters instead.
647 : func (c *tableCacheShard) newRangeKeyIter(
648 : ctx context.Context,
649 : v *tableCacheValue,
650 : file *fileMetadata,
651 : cr sstable.CommonReader,
652 : opts keyspan.SpanIterOptions,
653 1 : ) (keyspan.FragmentIterator, error) {
654 1 : transforms := file.FragmentIterTransforms()
655 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
656 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
657 1 : // file's range key blocks may surface deleted range keys below. This is
658 1 : // done here, rather than deferring to the block-property collector in order
659 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
660 1 : if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
661 0 : ok, _, err := c.checkAndIntersectFilters(v, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix)
662 0 : if err != nil {
663 0 : return nil, err
664 0 : } else if !ok {
665 0 : return nil, nil
666 0 : }
667 : }
668 : // TODO(radu): wrap in an AssertBounds.
669 1 : return cr.NewRawRangeKeyIter(ctx, transforms)
670 : }
671 :
672 : type tableCacheShardReaderProvider struct {
673 : c *tableCacheShard
674 : file *manifest.FileMetadata
675 : dbOpts *tableCacheOpts
676 : v *tableCacheValue
677 : }
678 :
679 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
680 :
681 : // GetReader implements sstable.ReaderProvider. Note that it is not the
682 : // responsibility of tableCacheShardReaderProvider to ensure that the file
683 : // continues to exist. The ReaderProvider is used in iterators where the
684 : // top-level iterator is pinning the read state and preventing the files from
685 : // being deleted.
686 : //
687 : // The caller must call tableCacheShardReaderProvider.Close.
688 : //
689 : // Note that currently the Reader returned here is only used to read value
690 : // blocks. This reader shouldn't be used for other purposes like reading keys
691 : // outside of virtual sstable bounds.
692 : //
693 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
694 : // that the reader isn't used for other purposes.
695 1 : func (rp *tableCacheShardReaderProvider) GetReader(ctx context.Context) (*sstable.Reader, error) {
696 1 : // Calling findNode gives us the responsibility of decrementing v's
697 1 : // refCount.
698 1 : v := rp.c.findNode(ctx, rp.file.FileBacking, rp.dbOpts)
699 1 : if v.err != nil {
700 0 : defer rp.c.unrefValue(v)
701 0 : return nil, v.err
702 0 : }
703 1 : rp.v = v
704 1 : return v.reader, nil
705 : }
706 :
707 : // Close implements sstable.ReaderProvider.
708 1 : func (rp *tableCacheShardReaderProvider) Close() {
709 1 : rp.c.unrefValue(rp.v)
710 1 : rp.v = nil
711 1 : }
712 :
713 : // getTableProperties return sst table properties for target file
714 : func (c *tableCacheShard) getTableProperties(
715 : file *fileMetadata, dbOpts *tableCacheOpts,
716 1 : ) (*sstable.Properties, error) {
717 1 : // Calling findNode gives us the responsibility of decrementing v's refCount here
718 1 : v := c.findNode(context.TODO(), file.FileBacking, dbOpts)
719 1 : defer c.unrefValue(v)
720 1 :
721 1 : if v.err != nil {
722 0 : return nil, v.err
723 0 : }
724 1 : return &v.reader.Properties, nil
725 : }
726 :
727 : // releaseNode releases a node from the tableCacheShard.
728 : //
729 : // c.mu must be held when calling this.
730 1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
731 1 : c.unlinkNode(n)
732 1 : c.clearNode(n)
733 1 : }
734 :
735 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
736 : // reference in place.
737 : //
738 : // c.mu must be held when calling this.
739 1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
740 1 : key := tableCacheKey{n.cacheID, n.fileNum}
741 1 : delete(c.mu.nodes, key)
742 1 :
743 1 : switch n.ptype {
744 1 : case tableCacheNodeHot:
745 1 : c.mu.sizeHot--
746 1 : case tableCacheNodeCold:
747 1 : c.mu.sizeCold--
748 1 : case tableCacheNodeTest:
749 1 : c.mu.sizeTest--
750 : }
751 :
752 1 : if n == c.mu.handHot {
753 1 : c.mu.handHot = c.mu.handHot.prev()
754 1 : }
755 1 : if n == c.mu.handCold {
756 1 : c.mu.handCold = c.mu.handCold.prev()
757 1 : }
758 1 : if n == c.mu.handTest {
759 1 : c.mu.handTest = c.mu.handTest.prev()
760 1 : }
761 :
762 1 : if n.unlink() == n {
763 1 : // This was the last entry in the cache.
764 1 : c.mu.handHot = nil
765 1 : c.mu.handCold = nil
766 1 : c.mu.handTest = nil
767 1 : }
768 :
769 1 : n.links.prev = nil
770 1 : n.links.next = nil
771 : }
772 :
773 1 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
774 1 : if v := n.value; v != nil {
775 1 : n.value = nil
776 1 : c.unrefValue(v)
777 1 : }
778 : }
779 :
780 : // unrefValue decrements the reference count for the specified value, releasing
781 : // it if the reference count fell to 0. Note that the value has a reference if
782 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
783 : // the node has already been removed from that map.
784 1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
785 1 : if v.refCount.Add(-1) == 0 {
786 1 : c.releasing.Add(1)
787 1 : c.releasingCh <- v
788 1 : }
789 : }
790 :
791 : // findNode returns the node for the table with the given file number, creating
792 : // that node if it didn't already exist. The caller is responsible for
793 : // decrementing the returned node's refCount.
794 : func (c *tableCacheShard) findNode(
795 : ctx context.Context, b *fileBacking, dbOpts *tableCacheOpts,
796 1 : ) *tableCacheValue {
797 1 : // The backing must have a positive refcount (otherwise it could be deleted at any time).
798 1 : b.MustHaveRefs()
799 1 : // Caution! Here fileMetadata can be a physical or virtual table. Table cache
800 1 : // readers are associated with the physical backings. All virtual tables with
801 1 : // the same backing will use the same reader from the cache; so no information
802 1 : // that can differ among these virtual tables can be plumbed into loadInfo.
803 1 : info := loadInfo{
804 1 : backingFileNum: b.DiskFileNum,
805 1 : }
806 1 :
807 1 : return c.findNodeInternal(ctx, info, dbOpts)
808 1 : }
809 :
810 : func (c *tableCacheShard) findNodeInternal(
811 : ctx context.Context, loadInfo loadInfo, dbOpts *tableCacheOpts,
812 1 : ) *tableCacheValue {
813 1 : // Fast-path for a hit in the cache.
814 1 : c.mu.RLock()
815 1 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
816 1 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
817 1 : // Fast-path hit.
818 1 : //
819 1 : // The caller is responsible for decrementing the refCount.
820 1 : v := n.value
821 1 : v.refCount.Add(1)
822 1 : c.mu.RUnlock()
823 1 : n.referenced.Store(true)
824 1 : c.hits.Add(1)
825 1 : <-v.loaded
826 1 : return v
827 1 : }
828 1 : c.mu.RUnlock()
829 1 :
830 1 : c.mu.Lock()
831 1 :
832 1 : n := c.mu.nodes[key]
833 1 : switch {
834 1 : case n == nil:
835 1 : // Slow-path miss of a non-existent node.
836 1 : n = &tableCacheNode{
837 1 : fileNum: loadInfo.backingFileNum,
838 1 : ptype: tableCacheNodeCold,
839 1 : }
840 1 : c.addNode(n, dbOpts)
841 1 : c.mu.sizeCold++
842 :
843 1 : case n.value != nil:
844 1 : // Slow-path hit of a hot or cold node.
845 1 : //
846 1 : // The caller is responsible for decrementing the refCount.
847 1 : v := n.value
848 1 : v.refCount.Add(1)
849 1 : n.referenced.Store(true)
850 1 : c.hits.Add(1)
851 1 : c.mu.Unlock()
852 1 : <-v.loaded
853 1 : return v
854 :
855 1 : default:
856 1 : // Slow-path miss of a test node.
857 1 : c.unlinkNode(n)
858 1 : c.mu.coldTarget++
859 1 : if c.mu.coldTarget > c.size {
860 1 : c.mu.coldTarget = c.size
861 1 : }
862 :
863 1 : n.referenced.Store(false)
864 1 : n.ptype = tableCacheNodeHot
865 1 : c.addNode(n, dbOpts)
866 1 : c.mu.sizeHot++
867 : }
868 :
869 1 : c.misses.Add(1)
870 1 :
871 1 : v := &tableCacheValue{
872 1 : loaded: make(chan struct{}),
873 1 : }
874 1 : v.refCount.Store(2)
875 1 : // Cache the closure invoked when an iterator is closed. This avoids an
876 1 : // allocation on every call to newIters.
877 1 : v.closeHook = func(i sstable.Iterator) error {
878 1 : if invariants.RaceEnabled {
879 0 : c.mu.Lock()
880 0 : delete(c.mu.iters, i)
881 0 : c.mu.Unlock()
882 0 : }
883 1 : c.unrefValue(v)
884 1 : c.iterCount.Add(-1)
885 1 : dbOpts.iterCount.Add(-1)
886 1 : return nil
887 : }
888 1 : n.value = v
889 1 :
890 1 : c.mu.Unlock()
891 1 :
892 1 : // Note adding to the cache lists must complete before we begin loading the
893 1 : // table as a failure during load will result in the node being unlinked.
894 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
895 1 : v.load(ctx, loadInfo, c, dbOpts)
896 1 : })
897 1 : return v
898 : }
899 :
900 1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
901 1 : c.evictNodes()
902 1 : n.cacheID = dbOpts.cacheID
903 1 : key := tableCacheKey{n.cacheID, n.fileNum}
904 1 : c.mu.nodes[key] = n
905 1 :
906 1 : n.links.next = n
907 1 : n.links.prev = n
908 1 : if c.mu.handHot == nil {
909 1 : // First element.
910 1 : c.mu.handHot = n
911 1 : c.mu.handCold = n
912 1 : c.mu.handTest = n
913 1 : } else {
914 1 : c.mu.handHot.link(n)
915 1 : }
916 :
917 1 : if c.mu.handCold == c.mu.handHot {
918 1 : c.mu.handCold = c.mu.handCold.prev()
919 1 : }
920 : }
921 :
922 1 : func (c *tableCacheShard) evictNodes() {
923 1 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
924 1 : c.runHandCold()
925 1 : }
926 : }
927 :
928 1 : func (c *tableCacheShard) runHandCold() {
929 1 : n := c.mu.handCold
930 1 : if n.ptype == tableCacheNodeCold {
931 1 : if n.referenced.Load() {
932 1 : n.referenced.Store(false)
933 1 : n.ptype = tableCacheNodeHot
934 1 : c.mu.sizeCold--
935 1 : c.mu.sizeHot++
936 1 : } else {
937 1 : c.clearNode(n)
938 1 : n.ptype = tableCacheNodeTest
939 1 : c.mu.sizeCold--
940 1 : c.mu.sizeTest++
941 1 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
942 1 : c.runHandTest()
943 1 : }
944 : }
945 : }
946 :
947 1 : c.mu.handCold = c.mu.handCold.next()
948 1 :
949 1 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
950 1 : c.runHandHot()
951 1 : }
952 : }
953 :
954 1 : func (c *tableCacheShard) runHandHot() {
955 1 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
956 1 : c.runHandTest()
957 1 : if c.mu.handHot == nil {
958 0 : return
959 0 : }
960 : }
961 :
962 1 : n := c.mu.handHot
963 1 : if n.ptype == tableCacheNodeHot {
964 1 : if n.referenced.Load() {
965 1 : n.referenced.Store(false)
966 1 : } else {
967 1 : n.ptype = tableCacheNodeCold
968 1 : c.mu.sizeHot--
969 1 : c.mu.sizeCold++
970 1 : }
971 : }
972 :
973 1 : c.mu.handHot = c.mu.handHot.next()
974 : }
975 :
976 1 : func (c *tableCacheShard) runHandTest() {
977 1 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
978 1 : c.runHandCold()
979 1 : if c.mu.handTest == nil {
980 0 : return
981 0 : }
982 : }
983 :
984 1 : n := c.mu.handTest
985 1 : if n.ptype == tableCacheNodeTest {
986 1 : c.mu.coldTarget--
987 1 : if c.mu.coldTarget < 0 {
988 1 : c.mu.coldTarget = 0
989 1 : }
990 1 : c.unlinkNode(n)
991 1 : c.clearNode(n)
992 : }
993 :
994 1 : c.mu.handTest = c.mu.handTest.next()
995 : }
996 :
997 1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
998 1 : c.mu.Lock()
999 1 : key := tableCacheKey{dbOpts.cacheID, fileNum}
1000 1 : n := c.mu.nodes[key]
1001 1 : var v *tableCacheValue
1002 1 : if n != nil {
1003 1 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
1004 1 : // the tableCacheNode.release() call synchronously below to ensure the
1005 1 : // sstable file descriptor is closed before returning. Note that
1006 1 : // tableCacheShard.releasing needs to be incremented while holding
1007 1 : // tableCacheShard.mu in order to avoid a race with Close()
1008 1 : c.unlinkNode(n)
1009 1 : v = n.value
1010 1 : if v != nil {
1011 1 : if !allowLeak {
1012 1 : if t := v.refCount.Add(-1); t != 0 {
1013 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
1014 0 : }
1015 : }
1016 1 : c.releasing.Add(1)
1017 : }
1018 : }
1019 :
1020 1 : c.mu.Unlock()
1021 1 :
1022 1 : if v != nil {
1023 1 : v.release(c)
1024 1 : }
1025 :
1026 1 : dbOpts.cache.EvictFile(dbOpts.cacheID, fileNum)
1027 : }
1028 :
1029 : // removeDB evicts any nodes which have a reference to the DB
1030 : // associated with dbOpts.cacheID. Make sure that there will
1031 : // be no more accesses to the files associated with the DB.
1032 1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
1033 1 : var fileNums []base.DiskFileNum
1034 1 :
1035 1 : c.mu.RLock()
1036 1 : // Collect the fileNums which need to be cleaned.
1037 1 : var firstNode *tableCacheNode
1038 1 : node := c.mu.handHot
1039 1 : for node != firstNode {
1040 1 : if firstNode == nil {
1041 1 : firstNode = node
1042 1 : }
1043 :
1044 1 : if node.cacheID == dbOpts.cacheID {
1045 1 : fileNums = append(fileNums, node.fileNum)
1046 1 : }
1047 1 : node = node.next()
1048 : }
1049 1 : c.mu.RUnlock()
1050 1 :
1051 1 : // Evict all the nodes associated with the DB.
1052 1 : // This should synchronously close all the files
1053 1 : // associated with the DB.
1054 1 : for _, fileNum := range fileNums {
1055 1 : c.evict(fileNum, dbOpts, true)
1056 1 : }
1057 : }
1058 :
1059 1 : func (c *tableCacheShard) Close() error {
1060 1 : c.mu.Lock()
1061 1 : defer c.mu.Unlock()
1062 1 :
1063 1 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1064 1 : // the case that there are leaked iterators.
1065 1 : var err error
1066 1 : if v := c.iterCount.Load(); v > 0 {
1067 1 : if !invariants.RaceEnabled {
1068 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1069 1 : } else {
1070 0 : var buf bytes.Buffer
1071 0 : for _, stack := range c.mu.iters {
1072 0 : fmt.Fprintf(&buf, "%s\n", stack)
1073 0 : }
1074 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1075 : }
1076 : }
1077 :
1078 1 : for c.mu.handHot != nil {
1079 0 : n := c.mu.handHot
1080 0 : if n.value != nil {
1081 0 : if n.value.refCount.Add(-1) == 0 {
1082 0 : c.releasing.Add(1)
1083 0 : c.releasingCh <- n.value
1084 0 : }
1085 : }
1086 0 : c.unlinkNode(n)
1087 : }
1088 1 : c.mu.nodes = nil
1089 1 : c.mu.handHot = nil
1090 1 : c.mu.handCold = nil
1091 1 : c.mu.handTest = nil
1092 1 :
1093 1 : // Only shutdown the releasing goroutine if there were no leaked
1094 1 : // iterators. If there were leaked iterators, we leave the goroutine running
1095 1 : // and the releasingCh open so that a subsequent iterator close can
1096 1 : // complete. This behavior is used by iterator leak tests. Leaking the
1097 1 : // goroutine for these tests is less bad not closing the iterator which
1098 1 : // triggers other warnings about block cache handles not being released.
1099 1 : if err != nil {
1100 1 : c.releasing.Wait()
1101 1 : return err
1102 1 : }
1103 :
1104 1 : close(c.releasingCh)
1105 1 : c.releasing.Wait()
1106 1 : c.releaseLoopExit.Wait()
1107 1 : return err
1108 : }
1109 :
1110 : type tableCacheValue struct {
1111 : closeHook func(i sstable.Iterator) error
1112 : reader *sstable.Reader
1113 : err error
1114 : isShared bool
1115 : loaded chan struct{}
1116 : // Reference count for the value. The reader is closed when the reference
1117 : // count drops to zero.
1118 : refCount atomic.Int32
1119 : }
1120 :
1121 : // loadInfo contains the information needed to populate a new cache entry.
1122 : type loadInfo struct {
1123 : backingFileNum base.DiskFileNum
1124 : }
1125 :
1126 : func (v *tableCacheValue) load(
1127 : ctx context.Context, loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts,
1128 1 : ) {
1129 1 : // Try opening the file first.
1130 1 : var f objstorage.Readable
1131 1 : var err error
1132 1 : f, err = dbOpts.objProvider.OpenForReading(
1133 1 : ctx, fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
1134 1 : )
1135 1 : if err == nil {
1136 1 : o := dbOpts.readerOpts
1137 1 : o.SetInternalCacheOpts(sstableinternal.CacheOptions{
1138 1 : Cache: dbOpts.cache,
1139 1 : CacheID: dbOpts.cacheID,
1140 1 : FileNum: loadInfo.backingFileNum,
1141 1 : })
1142 1 : v.reader, err = sstable.NewReader(ctx, f, o)
1143 1 : }
1144 1 : if err == nil {
1145 1 : var objMeta objstorage.ObjectMetadata
1146 1 : objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, loadInfo.backingFileNum)
1147 1 : v.isShared = objMeta.IsShared()
1148 1 : }
1149 1 : if err != nil {
1150 1 : v.err = errors.Wrapf(
1151 1 : err, "pebble: backing file %s error", loadInfo.backingFileNum)
1152 1 : }
1153 1 : if v.err != nil {
1154 1 : c.mu.Lock()
1155 1 : defer c.mu.Unlock()
1156 1 : // Lookup the node in the cache again as it might have already been
1157 1 : // removed.
1158 1 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
1159 1 : n := c.mu.nodes[key]
1160 1 : if n != nil && n.value == v {
1161 1 : c.releaseNode(n)
1162 1 : }
1163 : }
1164 1 : close(v.loaded)
1165 : }
1166 :
1167 1 : func (v *tableCacheValue) release(c *tableCacheShard) {
1168 1 : <-v.loaded
1169 1 : // Nothing to be done about an error at this point. Close the reader if it is
1170 1 : // open.
1171 1 : if v.reader != nil {
1172 1 : _ = v.reader.Close()
1173 1 : }
1174 1 : c.releasing.Done()
1175 : }
1176 :
1177 : type tableCacheNodeType int8
1178 :
1179 : const (
1180 : tableCacheNodeTest tableCacheNodeType = iota
1181 : tableCacheNodeCold
1182 : tableCacheNodeHot
1183 : )
1184 :
1185 0 : func (p tableCacheNodeType) String() string {
1186 0 : switch p {
1187 0 : case tableCacheNodeTest:
1188 0 : return "test"
1189 0 : case tableCacheNodeCold:
1190 0 : return "cold"
1191 0 : case tableCacheNodeHot:
1192 0 : return "hot"
1193 : }
1194 0 : return "unknown"
1195 : }
1196 :
1197 : type tableCacheNode struct {
1198 : fileNum base.DiskFileNum
1199 : value *tableCacheValue
1200 :
1201 : links struct {
1202 : next *tableCacheNode
1203 : prev *tableCacheNode
1204 : }
1205 : ptype tableCacheNodeType
1206 : // referenced is atomically set to indicate that this entry has been accessed
1207 : // since the last time one of the clock hands swept it.
1208 : referenced atomic.Bool
1209 :
1210 : // Storing the cache id associated with the DB instance here
1211 : // avoids the need to thread the dbOpts struct through many functions.
1212 : cacheID uint64
1213 : }
1214 :
1215 1 : func (n *tableCacheNode) next() *tableCacheNode {
1216 1 : if n == nil {
1217 0 : return nil
1218 0 : }
1219 1 : return n.links.next
1220 : }
1221 :
1222 1 : func (n *tableCacheNode) prev() *tableCacheNode {
1223 1 : if n == nil {
1224 0 : return nil
1225 0 : }
1226 1 : return n.links.prev
1227 : }
1228 :
1229 1 : func (n *tableCacheNode) link(s *tableCacheNode) {
1230 1 : s.links.prev = n.links.prev
1231 1 : s.links.prev.links.next = s
1232 1 : s.links.next = n
1233 1 : s.links.next.links.prev = s
1234 1 : }
1235 :
1236 1 : func (n *tableCacheNode) unlink() *tableCacheNode {
1237 1 : next := n.links.next
1238 1 : n.links.prev.links.next = n.links.next
1239 1 : n.links.next.links.prev = n.links.prev
1240 1 : n.links.prev = n
1241 1 : n.links.next = n
1242 1 : return next
1243 1 : }
1244 :
1245 : // iterSet holds a set of iterators of various key kinds, all constructed over
1246 : // the same data structure (eg, an sstable). A subset of the fields may be
1247 : // populated depending on the `iterKinds` passed to newIters.
1248 : type iterSet struct {
1249 : point internalIterator
1250 : rangeDeletion keyspan.FragmentIterator
1251 : rangeKey keyspan.FragmentIterator
1252 : }
1253 :
1254 : // TODO(jackson): Consider adding methods for fast paths that check whether an
1255 : // iterator of a particular kind is nil, so that these call sites don't need to
1256 : // reach into the struct's fields directly.
1257 :
1258 : // Point returns the contained point iterator. If there is no point iterator,
1259 : // Point returns a non-nil empty point iterator.
1260 1 : func (s *iterSet) Point() internalIterator {
1261 1 : if s.point == nil {
1262 1 : return emptyIter
1263 1 : }
1264 1 : return s.point
1265 : }
1266 :
1267 : // RangeDeletion returns the contained range deletion iterator. If there is no
1268 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
1269 : // iterator.
1270 1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
1271 1 : if s.rangeDeletion == nil {
1272 1 : return emptyKeyspanIter
1273 1 : }
1274 1 : return s.rangeDeletion
1275 : }
1276 :
1277 : // RangeKey returns the contained range key iterator. If there is no range key
1278 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
1279 1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
1280 1 : if s.rangeKey == nil {
1281 0 : return emptyKeyspanIter
1282 0 : }
1283 1 : return s.rangeKey
1284 : }
1285 :
1286 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
1287 : // must be not be called on the constituent iterators.
1288 1 : func (s *iterSet) CloseAll() error {
1289 1 : var err error
1290 1 : if s.point != nil {
1291 1 : err = s.point.Close()
1292 1 : s.point = nil
1293 1 : }
1294 1 : if s.rangeDeletion != nil {
1295 1 : s.rangeDeletion.Close()
1296 1 : s.rangeDeletion = nil
1297 1 : }
1298 1 : if s.rangeKey != nil {
1299 1 : s.rangeKey.Close()
1300 1 : s.rangeKey = nil
1301 1 : }
1302 1 : return err
1303 : }
1304 :
1305 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
1306 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
1307 : // represent a set of desired iterator kinds.
1308 : type iterKinds uint8
1309 :
1310 1 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
1311 1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
1312 1 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
1313 :
1314 : const (
1315 : iterPointKeys iterKinds = 1 << iota
1316 : iterRangeDeletions
1317 : iterRangeKeys
1318 : )
|