Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
23 : "github.com/cockroachdb/pebble/internal/manifest"
24 : "github.com/cockroachdb/pebble/internal/private"
25 : "github.com/cockroachdb/pebble/objstorage"
26 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
27 : "github.com/cockroachdb/pebble/sstable"
28 : )
29 :
30 : var emptyIter = &errorIter{err: nil}
31 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
32 :
33 : // tableNewIters creates new iterators (point, range deletion and/or range key)
34 : // for the given file metadata. Which of the various iterator kinds the user is
35 : // requesting is specified with the iterKinds bitmap.
36 : //
37 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
38 : // populated with iterators.
39 : //
40 : // If a point iterator is requested and the operation was successful,
41 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
42 : // finished.
43 : //
44 : // If a range deletion or range key iterator is requested, the corresponding
45 : // iterator may be nil if the table does not contain any keys of the
46 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
47 : // RangeKey() convenience methods that return non-nil empty iterators that may
48 : // be used if the caller requires a non-nil iterator.
49 : //
50 : // On error, all iterators are nil.
51 : //
52 : // The only (non-test) implementation of tableNewIters is
53 : // tableCacheContainer.newIters().
54 : type tableNewIters func(
55 : ctx context.Context,
56 : file *manifest.FileMetadata,
57 : opts *IterOptions,
58 : internalOpts internalIterOpts,
59 : kinds iterKinds,
60 : ) (iterSet, error)
61 :
62 : // TODO implements the old tableNewIters interface that always attempted to open
63 : // a point iterator and a range deletion iterator. All call sites should be
64 : // updated to use `f` directly.
65 : func (f tableNewIters) TODO(
66 : ctx context.Context,
67 : file *manifest.FileMetadata,
68 : opts *IterOptions,
69 : internalOpts internalIterOpts,
70 2 : ) (internalIterator, keyspan.FragmentIterator, error) {
71 2 : iters, err := f(ctx, file, opts, internalOpts, iterPointKeys|iterRangeDeletions)
72 2 : if err != nil {
73 1 : return nil, nil, err
74 1 : }
75 2 : return iters.point, iters.rangeDeletion, nil
76 : }
77 :
78 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
79 : // for the rangedel iterator returned by tableNewIters.
80 : func tableNewRangeDelIter(
81 : ctx context.Context, newIters tableNewIters,
82 2 : ) keyspanimpl.TableNewSpanIter {
83 2 : return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
84 2 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
85 2 : if err != nil {
86 0 : return nil, err
87 0 : }
88 2 : return iters.RangeDeletion(), nil
89 : }
90 : }
91 :
92 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
93 : // for the range key iterator returned by tableNewIters.
94 : func tableNewRangeKeyIter(
95 : ctx context.Context, newIters tableNewIters,
96 2 : ) keyspanimpl.TableNewSpanIter {
97 2 : return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
98 2 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
99 2 : if err != nil {
100 1 : return nil, err
101 1 : }
102 2 : return iters.RangeKey(), nil
103 : }
104 : }
105 :
106 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
107 :
108 : // tableCacheOpts contains the db specific fields
109 : // of a table cache. This is stored in the tableCacheContainer
110 : // along with the table cache.
111 : // NB: It is important to make sure that the fields in this
112 : // struct are read-only. Since the fields here are shared
113 : // by every single tableCacheShard, if non read-only fields
114 : // are updated, we could have unnecessary evictions of those
115 : // fields, and the surrounding fields from the CPU caches.
116 : type tableCacheOpts struct {
117 : // iterCount keeps track of how many iterators are open. It is used to keep
118 : // track of leaked iterators on a per-db level.
119 : iterCount *atomic.Int32
120 :
121 : loggerAndTracer LoggerAndTracer
122 : cacheID uint64
123 : objProvider objstorage.Provider
124 : opts sstable.ReaderOptions
125 : filterMetrics *sstable.FilterMetricsTracker
126 : sstStatsCollector *sstable.CategoryStatsCollector
127 : }
128 :
129 : // tableCacheContainer contains the table cache and
130 : // fields which are unique to the DB.
131 : type tableCacheContainer struct {
132 : tableCache *TableCache
133 :
134 : // dbOpts contains fields relevant to the table cache
135 : // which are unique to each DB.
136 : dbOpts tableCacheOpts
137 : }
138 :
139 : // newTableCacheContainer will panic if the underlying cache in the table cache
140 : // doesn't match Options.Cache.
141 : func newTableCacheContainer(
142 : tc *TableCache,
143 : cacheID uint64,
144 : objProvider objstorage.Provider,
145 : opts *Options,
146 : size int,
147 : sstStatsCollector *sstable.CategoryStatsCollector,
148 2 : ) *tableCacheContainer {
149 2 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
150 2 : if tc != nil {
151 1 : if tc.cache != opts.Cache {
152 0 : panic("pebble: underlying cache for the table cache and db are different")
153 : }
154 1 : tc.Ref()
155 2 : } else {
156 2 : // NewTableCache should create a ref to tc which the container should
157 2 : // drop whenever it is closed.
158 2 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
159 2 : }
160 :
161 2 : t := &tableCacheContainer{}
162 2 : t.tableCache = tc
163 2 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
164 2 : t.dbOpts.cacheID = cacheID
165 2 : t.dbOpts.objProvider = objProvider
166 2 : t.dbOpts.opts = opts.MakeReaderOptions()
167 2 : t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
168 2 : t.dbOpts.iterCount = new(atomic.Int32)
169 2 : t.dbOpts.sstStatsCollector = sstStatsCollector
170 2 : return t
171 : }
172 :
173 : // Before calling close, make sure that there will be no further need
174 : // to access any of the files associated with the store.
175 2 : func (c *tableCacheContainer) close() error {
176 2 : // We want to do some cleanup work here. Check for leaked iterators
177 2 : // by the DB using this container. Note that we'll still perform cleanup
178 2 : // below in the case that there are leaked iterators.
179 2 : var err error
180 2 : if v := c.dbOpts.iterCount.Load(); v > 0 {
181 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
182 1 : }
183 :
184 : // Release nodes here.
185 2 : for _, shard := range c.tableCache.shards {
186 2 : if shard != nil {
187 2 : shard.removeDB(&c.dbOpts)
188 2 : }
189 : }
190 2 : return firstError(err, c.tableCache.Unref())
191 : }
192 :
193 : func (c *tableCacheContainer) newIters(
194 : ctx context.Context,
195 : file *manifest.FileMetadata,
196 : opts *IterOptions,
197 : internalOpts internalIterOpts,
198 : kinds iterKinds,
199 2 : ) (iterSet, error) {
200 2 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
201 2 : }
202 :
203 : // getTableProperties returns the properties associated with the backing physical
204 : // table if the input metadata belongs to a virtual sstable.
205 1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
206 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
207 1 : }
208 :
209 2 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
210 2 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
211 2 : }
212 :
213 2 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
214 2 : var m CacheMetrics
215 2 : for i := range c.tableCache.shards {
216 2 : s := c.tableCache.shards[i]
217 2 : s.mu.RLock()
218 2 : m.Count += int64(len(s.mu.nodes))
219 2 : s.mu.RUnlock()
220 2 : m.Hits += s.hits.Load()
221 2 : m.Misses += s.misses.Load()
222 2 : }
223 2 : m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
224 2 : f := c.dbOpts.filterMetrics.Load()
225 2 : return m, f
226 : }
227 :
228 : func (c *tableCacheContainer) estimateSize(
229 : meta *fileMetadata, lower, upper []byte,
230 2 : ) (size uint64, err error) {
231 2 : if meta.Virtual {
232 2 : err = c.withVirtualReader(
233 2 : meta.VirtualMeta(),
234 2 : func(r sstable.VirtualReader) (err error) {
235 2 : size, err = r.EstimateDiskUsage(lower, upper)
236 2 : return err
237 2 : },
238 : )
239 2 : } else {
240 2 : err = c.withReader(
241 2 : meta.PhysicalMeta(),
242 2 : func(r *sstable.Reader) (err error) {
243 2 : size, err = r.EstimateDiskUsage(lower, upper)
244 2 : return err
245 2 : },
246 : )
247 : }
248 2 : if err != nil {
249 0 : return 0, err
250 0 : }
251 2 : return size, nil
252 : }
253 :
254 : // createCommonReader creates a Reader for this file.
255 2 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
256 2 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
257 2 : var cr sstable.CommonReader = v.reader
258 2 : if file.Virtual {
259 2 : virtualReader := sstable.MakeVirtualReader(
260 2 : v.reader, file.VirtualMeta().VirtualReaderParams(v.isShared),
261 2 : )
262 2 : cr = &virtualReader
263 2 : }
264 2 : return cr
265 : }
266 :
267 : func (c *tableCacheContainer) withCommonReader(
268 : meta *fileMetadata, fn func(sstable.CommonReader) error,
269 2 : ) error {
270 2 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
271 2 : v := s.findNode(meta.FileBacking, &c.dbOpts)
272 2 : defer s.unrefValue(v)
273 2 : if v.err != nil {
274 0 : return v.err
275 0 : }
276 2 : return fn(createCommonReader(v, meta))
277 : }
278 :
279 2 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
280 2 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
281 2 : v := s.findNode(meta.FileBacking, &c.dbOpts)
282 2 : defer s.unrefValue(v)
283 2 : if v.err != nil {
284 1 : return v.err
285 1 : }
286 2 : return fn(v.reader)
287 : }
288 :
289 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
290 : func (c *tableCacheContainer) withVirtualReader(
291 : meta virtualMeta, fn func(sstable.VirtualReader) error,
292 2 : ) error {
293 2 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
294 2 : v := s.findNode(meta.FileBacking, &c.dbOpts)
295 2 : defer s.unrefValue(v)
296 2 : if v.err != nil {
297 0 : return v.err
298 0 : }
299 2 : provider := c.dbOpts.objProvider
300 2 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
301 2 : if err != nil {
302 0 : return err
303 0 : }
304 2 : return fn(sstable.MakeVirtualReader(v.reader, meta.VirtualReaderParams(objMeta.IsShared())))
305 : }
306 :
307 2 : func (c *tableCacheContainer) iterCount() int64 {
308 2 : return int64(c.dbOpts.iterCount.Load())
309 2 : }
310 :
311 : // TableCache is a shareable cache for open sstables.
312 : type TableCache struct {
313 : refs atomic.Int64
314 :
315 : cache *Cache
316 : shards []*tableCacheShard
317 : }
318 :
319 : // Ref adds a reference to the table cache. Once tableCache.init returns,
320 : // the table cache only remains valid if there is at least one reference
321 : // to it.
322 1 : func (c *TableCache) Ref() {
323 1 : v := c.refs.Add(1)
324 1 : // We don't want the reference count to ever go from 0 -> 1,
325 1 : // cause a reference count of 0 implies that we've closed the cache.
326 1 : if v <= 1 {
327 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
328 : }
329 : }
330 :
331 : // Unref removes a reference to the table cache.
332 2 : func (c *TableCache) Unref() error {
333 2 : v := c.refs.Add(-1)
334 2 : switch {
335 1 : case v < 0:
336 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
337 2 : case v == 0:
338 2 : var err error
339 2 : for i := range c.shards {
340 2 : // The cache shard is not allocated yet, nothing to close
341 2 : if c.shards[i] == nil {
342 0 : continue
343 : }
344 2 : err = firstError(err, c.shards[i].Close())
345 : }
346 :
347 : // Unref the cache which we create a reference to when the tableCache
348 : // is first instantiated.
349 2 : c.cache.Unref()
350 2 : return err
351 : }
352 1 : return nil
353 : }
354 :
355 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
356 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
357 2 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
358 2 : if size == 0 {
359 0 : panic("pebble: cannot create a table cache of size 0")
360 2 : } else if numShards == 0 {
361 0 : panic("pebble: cannot create a table cache with 0 shards")
362 : }
363 :
364 2 : c := &TableCache{}
365 2 : c.cache = cache
366 2 : c.cache.Ref()
367 2 :
368 2 : c.shards = make([]*tableCacheShard, numShards)
369 2 : for i := range c.shards {
370 2 : c.shards[i] = &tableCacheShard{}
371 2 : c.shards[i].init(size / len(c.shards))
372 2 : }
373 :
374 : // Hold a ref to the cache here.
375 2 : c.refs.Store(1)
376 2 :
377 2 : return c
378 : }
379 :
380 2 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
381 2 : return c.shards[uint64(fileNum)%uint64(len(c.shards))]
382 2 : }
383 :
384 : type tableCacheKey struct {
385 : cacheID uint64
386 : fileNum base.DiskFileNum
387 : }
388 :
389 : type tableCacheShard struct {
390 : hits atomic.Int64
391 : misses atomic.Int64
392 : iterCount atomic.Int32
393 :
394 : size int
395 :
396 : mu struct {
397 : sync.RWMutex
398 : nodes map[tableCacheKey]*tableCacheNode
399 : // The iters map is only created and populated in race builds.
400 : iters map[io.Closer][]byte
401 :
402 : handHot *tableCacheNode
403 : handCold *tableCacheNode
404 : handTest *tableCacheNode
405 :
406 : coldTarget int
407 : sizeHot int
408 : sizeCold int
409 : sizeTest int
410 : }
411 : releasing sync.WaitGroup
412 : releasingCh chan *tableCacheValue
413 : releaseLoopExit sync.WaitGroup
414 : }
415 :
416 2 : func (c *tableCacheShard) init(size int) {
417 2 : c.size = size
418 2 :
419 2 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
420 2 : c.mu.coldTarget = size
421 2 : c.releasingCh = make(chan *tableCacheValue, 100)
422 2 : c.releaseLoopExit.Add(1)
423 2 : go c.releaseLoop()
424 2 :
425 2 : if invariants.RaceEnabled {
426 0 : c.mu.iters = make(map[io.Closer][]byte)
427 0 : }
428 : }
429 :
430 2 : func (c *tableCacheShard) releaseLoop() {
431 2 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
432 2 : defer c.releaseLoopExit.Done()
433 2 : for v := range c.releasingCh {
434 2 : v.release(c)
435 2 : }
436 : })
437 : }
438 :
439 : // checkAndIntersectFilters checks the specific table and block property filters
440 : // for intersection with any available table and block-level properties. Returns
441 : // true for ok if this table should be read by this iterator.
442 : func (c *tableCacheShard) checkAndIntersectFilters(
443 : v *tableCacheValue,
444 : tableFilter func(userProps map[string]string) bool,
445 : blockPropertyFilters []BlockPropertyFilter,
446 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
447 : syntheticSuffix sstable.SyntheticSuffix,
448 2 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
449 2 : if tableFilter != nil &&
450 2 : !tableFilter(v.reader.Properties.UserProperties) {
451 0 : return false, nil, nil
452 0 : }
453 :
454 2 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
455 2 : filterer, err = sstable.IntersectsTable(
456 2 : blockPropertyFilters,
457 2 : boundLimitedFilter,
458 2 : v.reader.Properties.UserProperties,
459 2 : syntheticSuffix,
460 2 : )
461 2 : // NB: IntersectsTable will return a nil filterer if the table-level
462 2 : // properties indicate there's no intersection with the provided filters.
463 2 : if filterer == nil || err != nil {
464 2 : return false, nil, err
465 2 : }
466 : }
467 2 : return true, filterer, nil
468 : }
469 :
470 : func (c *tableCacheShard) newIters(
471 : ctx context.Context,
472 : file *manifest.FileMetadata,
473 : opts *IterOptions,
474 : internalOpts internalIterOpts,
475 : dbOpts *tableCacheOpts,
476 : kinds iterKinds,
477 2 : ) (iterSet, error) {
478 2 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
479 2 : // since parts of the sstable are read during the construction. The Reader
480 2 : // should not remember that context since the Reader can be long-lived.
481 2 :
482 2 : // Calling findNode gives us the responsibility of decrementing v's
483 2 : // refCount. If opening the underlying table resulted in error, then we
484 2 : // decrement this straight away. Otherwise, we pass that responsibility to
485 2 : // the sstable iterator, which decrements when it is closed.
486 2 : v := c.findNode(file.FileBacking, dbOpts)
487 2 : if v.err != nil {
488 1 : defer c.unrefValue(v)
489 1 : return iterSet{}, v.err
490 1 : }
491 :
492 : // Note: This suffers an allocation for virtual sstables.
493 2 : cr := createCommonReader(v, file)
494 2 : var iters iterSet
495 2 : var err error
496 2 : if kinds.RangeKey() && file.HasRangeKeys {
497 2 : iters.rangeKey, err = c.newRangeKeyIter(v, file, cr, opts.SpanIterOptions())
498 2 : }
499 2 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
500 2 : iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
501 2 : }
502 2 : if kinds.Point() && err == nil {
503 2 : iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
504 2 : }
505 2 : if err != nil {
506 1 : // NB: There's a subtlety here: Because the point iterator is the last
507 1 : // iterator we attempt to create, it's not possible for:
508 1 : // err != nil && iters.point != nil
509 1 : // If it were possible, we'd need to account for it to avoid double
510 1 : // unref-ing here, once during CloseAll and once during `unrefValue`.
511 1 : iters.CloseAll()
512 1 : c.unrefValue(v)
513 1 : return iterSet{}, err
514 1 : }
515 : // Only point iterators ever require the reader stay pinned in the cache. If
516 : // we're not returning a point iterator to the caller, we need to unref v.
517 2 : if iters.point == nil {
518 2 : c.unrefValue(v)
519 2 : }
520 2 : return iters, nil
521 : }
522 :
523 : // newPointIter is an internal helper that constructs a point iterator over a
524 : // sstable. This function is for internal use only, and callers should use
525 : // newIters instead.
526 : func (c *tableCacheShard) newPointIter(
527 : ctx context.Context,
528 : v *tableCacheValue,
529 : file *manifest.FileMetadata,
530 : cr sstable.CommonReader,
531 : opts *IterOptions,
532 : internalOpts internalIterOpts,
533 : dbOpts *tableCacheOpts,
534 2 : ) (internalIterator, error) {
535 2 : var (
536 2 : hideObsoletePoints bool = false
537 2 : pointKeyFilters []BlockPropertyFilter
538 2 : filterer *sstable.BlockPropertiesFilterer
539 2 : )
540 2 : if opts != nil {
541 2 : // This code is appending (at most one filter) in-place to
542 2 : // opts.PointKeyFilters even though the slice is shared for iterators in
543 2 : // the same iterator tree. This is acceptable since all the following
544 2 : // properties are true:
545 2 : // - The iterator tree is single threaded, so the shared backing for the
546 2 : // slice is being mutated in a single threaded manner.
547 2 : // - Each shallow copy of the slice has its own notion of length.
548 2 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
549 2 : // struct, which is stateless, so overwriting that struct when creating
550 2 : // one sstable iterator is harmless to other sstable iterators that are
551 2 : // relying on that struct.
552 2 : //
553 2 : // An alternative would be to have different slices for different sstable
554 2 : // iterators, but that requires more work to avoid allocations.
555 2 : //
556 2 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
557 2 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
558 2 : // apply the obsolete block property filter. We could optimize this by
559 2 : // applying the filter.
560 2 : hideObsoletePoints, pointKeyFilters =
561 2 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
562 2 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
563 2 :
564 2 : var ok bool
565 2 : var err error
566 2 : ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
567 2 : pointKeyFilters, internalOpts.boundLimitedFilter, file.SyntheticSuffix)
568 2 : if err != nil {
569 0 : return nil, err
570 2 : } else if !ok {
571 2 : // No point keys within the table match the filters.
572 2 : return nil, nil
573 2 : }
574 : }
575 :
576 2 : var iter sstable.Iterator
577 2 : useFilter := true
578 2 : if opts != nil {
579 2 : useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
580 2 : ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
581 2 : }
582 2 : tableFormat, err := v.reader.TableFormat()
583 2 : if err != nil {
584 0 : return nil, err
585 0 : }
586 2 : var rp sstable.ReaderProvider
587 2 : if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
588 2 : rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
589 2 : }
590 :
591 2 : if v.isShared && file.SyntheticSeqNum() != 0 {
592 2 : if tableFormat < sstable.TableFormatPebblev4 {
593 0 : return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
594 0 : }
595 : // The table is shared and ingested.
596 2 : hideObsoletePoints = true
597 : }
598 2 : transforms := file.IterTransforms()
599 2 : transforms.HideObsoletePoints = hideObsoletePoints
600 2 : var categoryAndQoS sstable.CategoryAndQoS
601 2 : if opts != nil {
602 2 : categoryAndQoS = opts.CategoryAndQoS
603 2 : }
604 2 : if internalOpts.bytesIterated != nil {
605 2 : iter, err = cr.NewCompactionIter(
606 2 : transforms, internalOpts.bytesIterated, categoryAndQoS, dbOpts.sstStatsCollector, rp,
607 2 : internalOpts.bufferPool)
608 2 : } else {
609 2 : iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
610 2 : ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, useFilter,
611 2 : internalOpts.stats, categoryAndQoS, dbOpts.sstStatsCollector, rp)
612 2 : }
613 2 : if err != nil {
614 1 : return nil, err
615 1 : }
616 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
617 : // care to avoid introducing an allocation here by adding a closure.
618 2 : iter.SetCloseHook(v.closeHook)
619 2 : c.iterCount.Add(1)
620 2 : dbOpts.iterCount.Add(1)
621 2 : if invariants.RaceEnabled {
622 0 : c.mu.Lock()
623 0 : c.mu.iters[iter] = debug.Stack()
624 0 : c.mu.Unlock()
625 0 : }
626 2 : return iter, nil
627 : }
628 :
629 : // newRangeDelIter is an internal helper that constructs an iterator over a
630 : // sstable's range deletions. This function is for table-cache internal use
631 : // only, and callers should use newIters instead.
632 : func (c *tableCacheShard) newRangeDelIter(
633 : ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *tableCacheOpts,
634 2 : ) (keyspan.FragmentIterator, error) {
635 2 : // NB: range-del iterator does not maintain a reference to the table, nor
636 2 : // does it need to read from it after creation.
637 2 : rangeDelIter, err := cr.NewRawRangeDelIter(file.IterTransforms())
638 2 : if err != nil {
639 1 : return nil, err
640 1 : }
641 : // Assert expected bounds in tests.
642 2 : if invariants.Enabled && rangeDelIter != nil {
643 2 : cmp := base.DefaultComparer.Compare
644 2 : if dbOpts.opts.Comparer != nil {
645 2 : cmp = dbOpts.opts.Comparer.Compare
646 2 : }
647 2 : rangeDelIter = keyspan.AssertBounds(
648 2 : rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
649 2 : )
650 : }
651 2 : return rangeDelIter, nil
652 : }
653 :
654 : // newRangeKeyIter is an internal helper that constructs an iterator over a
655 : // sstable's range keys. This function is for table-cache internal use only, and
656 : // callers should use newIters instead.
657 : func (c *tableCacheShard) newRangeKeyIter(
658 : v *tableCacheValue, file *fileMetadata, cr sstable.CommonReader, opts keyspan.SpanIterOptions,
659 2 : ) (keyspan.FragmentIterator, error) {
660 2 : transforms := file.IterTransforms()
661 2 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
662 2 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
663 2 : // file's range key blocks may surface deleted range keys below. This is
664 2 : // done here, rather than deferring to the block-property collector in order
665 2 : // to maintain parity with point keys and the treatment of RANGEDELs.
666 2 : if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
667 0 : ok, _, err := c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix)
668 0 : if err != nil {
669 0 : return nil, err
670 0 : } else if !ok {
671 0 : return nil, nil
672 0 : }
673 : }
674 : // TODO(radu): wrap in an AssertBounds.
675 2 : return cr.NewRawRangeKeyIter(transforms)
676 : }
677 :
678 : type tableCacheShardReaderProvider struct {
679 : c *tableCacheShard
680 : file *manifest.FileMetadata
681 : dbOpts *tableCacheOpts
682 : v *tableCacheValue
683 : }
684 :
685 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
686 :
687 : // GetReader implements sstable.ReaderProvider. Note that it is not the
688 : // responsibility of tableCacheShardReaderProvider to ensure that the file
689 : // continues to exist. The ReaderProvider is used in iterators where the
690 : // top-level iterator is pinning the read state and preventing the files from
691 : // being deleted.
692 : //
693 : // The caller must call tableCacheShardReaderProvider.Close.
694 : //
695 : // Note that currently the Reader returned here is only used to read value
696 : // blocks. This reader shouldn't be used for other purposes like reading keys
697 : // outside of virtual sstable bounds.
698 : //
699 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
700 : // that the reader isn't used for other purposes.
701 2 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
702 2 : // Calling findNode gives us the responsibility of decrementing v's
703 2 : // refCount.
704 2 : v := rp.c.findNode(rp.file.FileBacking, rp.dbOpts)
705 2 : if v.err != nil {
706 0 : defer rp.c.unrefValue(v)
707 0 : return nil, v.err
708 0 : }
709 2 : rp.v = v
710 2 : return v.reader, nil
711 : }
712 :
713 : // Close implements sstable.ReaderProvider.
714 2 : func (rp *tableCacheShardReaderProvider) Close() {
715 2 : rp.c.unrefValue(rp.v)
716 2 : rp.v = nil
717 2 : }
718 :
719 : // getTableProperties return sst table properties for target file
720 : func (c *tableCacheShard) getTableProperties(
721 : file *fileMetadata, dbOpts *tableCacheOpts,
722 1 : ) (*sstable.Properties, error) {
723 1 : // Calling findNode gives us the responsibility of decrementing v's refCount here
724 1 : v := c.findNode(file.FileBacking, dbOpts)
725 1 : defer c.unrefValue(v)
726 1 :
727 1 : if v.err != nil {
728 0 : return nil, v.err
729 0 : }
730 1 : return &v.reader.Properties, nil
731 : }
732 :
733 : // releaseNode releases a node from the tableCacheShard.
734 : //
735 : // c.mu must be held when calling this.
736 1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
737 1 : c.unlinkNode(n)
738 1 : c.clearNode(n)
739 1 : }
740 :
741 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
742 : // reference in place.
743 : //
744 : // c.mu must be held when calling this.
745 2 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
746 2 : key := tableCacheKey{n.cacheID, n.fileNum}
747 2 : delete(c.mu.nodes, key)
748 2 :
749 2 : switch n.ptype {
750 2 : case tableCacheNodeHot:
751 2 : c.mu.sizeHot--
752 2 : case tableCacheNodeCold:
753 2 : c.mu.sizeCold--
754 2 : case tableCacheNodeTest:
755 2 : c.mu.sizeTest--
756 : }
757 :
758 2 : if n == c.mu.handHot {
759 2 : c.mu.handHot = c.mu.handHot.prev()
760 2 : }
761 2 : if n == c.mu.handCold {
762 2 : c.mu.handCold = c.mu.handCold.prev()
763 2 : }
764 2 : if n == c.mu.handTest {
765 2 : c.mu.handTest = c.mu.handTest.prev()
766 2 : }
767 :
768 2 : if n.unlink() == n {
769 2 : // This was the last entry in the cache.
770 2 : c.mu.handHot = nil
771 2 : c.mu.handCold = nil
772 2 : c.mu.handTest = nil
773 2 : }
774 :
775 2 : n.links.prev = nil
776 2 : n.links.next = nil
777 : }
778 :
779 2 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
780 2 : if v := n.value; v != nil {
781 2 : n.value = nil
782 2 : c.unrefValue(v)
783 2 : }
784 : }
785 :
786 : // unrefValue decrements the reference count for the specified value, releasing
787 : // it if the reference count fell to 0. Note that the value has a reference if
788 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
789 : // the node has already been removed from that map.
790 2 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
791 2 : if v.refCount.Add(-1) == 0 {
792 2 : c.releasing.Add(1)
793 2 : c.releasingCh <- v
794 2 : }
795 : }
796 :
797 : // findNode returns the node for the table with the given file number, creating
798 : // that node if it didn't already exist. The caller is responsible for
799 : // decrementing the returned node's refCount.
800 2 : func (c *tableCacheShard) findNode(b *fileBacking, dbOpts *tableCacheOpts) *tableCacheValue {
801 2 : // The backing must have a positive refcount (otherwise it could be deleted at any time).
802 2 : b.MustHaveRefs()
803 2 : // Caution! Here fileMetadata can be a physical or virtual table. Table cache
804 2 : // readers are associated with the physical backings. All virtual tables with
805 2 : // the same backing will use the same reader from the cache; so no information
806 2 : // that can differ among these virtual tables can be plumbed into loadInfo.
807 2 : info := loadInfo{
808 2 : backingFileNum: b.DiskFileNum,
809 2 : }
810 2 :
811 2 : return c.findNodeInternal(info, dbOpts)
812 2 : }
813 :
814 : func (c *tableCacheShard) findNodeInternal(
815 : loadInfo loadInfo, dbOpts *tableCacheOpts,
816 2 : ) *tableCacheValue {
817 2 : // Fast-path for a hit in the cache.
818 2 : c.mu.RLock()
819 2 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
820 2 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
821 2 : // Fast-path hit.
822 2 : //
823 2 : // The caller is responsible for decrementing the refCount.
824 2 : v := n.value
825 2 : v.refCount.Add(1)
826 2 : c.mu.RUnlock()
827 2 : n.referenced.Store(true)
828 2 : c.hits.Add(1)
829 2 : <-v.loaded
830 2 : return v
831 2 : }
832 2 : c.mu.RUnlock()
833 2 :
834 2 : c.mu.Lock()
835 2 :
836 2 : n := c.mu.nodes[key]
837 2 : switch {
838 2 : case n == nil:
839 2 : // Slow-path miss of a non-existent node.
840 2 : n = &tableCacheNode{
841 2 : fileNum: loadInfo.backingFileNum,
842 2 : ptype: tableCacheNodeCold,
843 2 : }
844 2 : c.addNode(n, dbOpts)
845 2 : c.mu.sizeCold++
846 :
847 2 : case n.value != nil:
848 2 : // Slow-path hit of a hot or cold node.
849 2 : //
850 2 : // The caller is responsible for decrementing the refCount.
851 2 : v := n.value
852 2 : v.refCount.Add(1)
853 2 : n.referenced.Store(true)
854 2 : c.hits.Add(1)
855 2 : c.mu.Unlock()
856 2 : <-v.loaded
857 2 : return v
858 :
859 2 : default:
860 2 : // Slow-path miss of a test node.
861 2 : c.unlinkNode(n)
862 2 : c.mu.coldTarget++
863 2 : if c.mu.coldTarget > c.size {
864 1 : c.mu.coldTarget = c.size
865 1 : }
866 :
867 2 : n.referenced.Store(false)
868 2 : n.ptype = tableCacheNodeHot
869 2 : c.addNode(n, dbOpts)
870 2 : c.mu.sizeHot++
871 : }
872 :
873 2 : c.misses.Add(1)
874 2 :
875 2 : v := &tableCacheValue{
876 2 : loaded: make(chan struct{}),
877 2 : }
878 2 : v.refCount.Store(2)
879 2 : // Cache the closure invoked when an iterator is closed. This avoids an
880 2 : // allocation on every call to newIters.
881 2 : v.closeHook = func(i sstable.Iterator) error {
882 2 : if invariants.RaceEnabled {
883 0 : c.mu.Lock()
884 0 : delete(c.mu.iters, i)
885 0 : c.mu.Unlock()
886 0 : }
887 2 : c.unrefValue(v)
888 2 : c.iterCount.Add(-1)
889 2 : dbOpts.iterCount.Add(-1)
890 2 : return nil
891 : }
892 2 : n.value = v
893 2 :
894 2 : c.mu.Unlock()
895 2 :
896 2 : // Note adding to the cache lists must complete before we begin loading the
897 2 : // table as a failure during load will result in the node being unlinked.
898 2 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
899 2 : v.load(loadInfo, c, dbOpts)
900 2 : })
901 2 : return v
902 : }
903 :
904 2 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
905 2 : c.evictNodes()
906 2 : n.cacheID = dbOpts.cacheID
907 2 : key := tableCacheKey{n.cacheID, n.fileNum}
908 2 : c.mu.nodes[key] = n
909 2 :
910 2 : n.links.next = n
911 2 : n.links.prev = n
912 2 : if c.mu.handHot == nil {
913 2 : // First element.
914 2 : c.mu.handHot = n
915 2 : c.mu.handCold = n
916 2 : c.mu.handTest = n
917 2 : } else {
918 2 : c.mu.handHot.link(n)
919 2 : }
920 :
921 2 : if c.mu.handCold == c.mu.handHot {
922 2 : c.mu.handCold = c.mu.handCold.prev()
923 2 : }
924 : }
925 :
926 2 : func (c *tableCacheShard) evictNodes() {
927 2 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
928 2 : c.runHandCold()
929 2 : }
930 : }
931 :
932 2 : func (c *tableCacheShard) runHandCold() {
933 2 : n := c.mu.handCold
934 2 : if n.ptype == tableCacheNodeCold {
935 2 : if n.referenced.Load() {
936 2 : n.referenced.Store(false)
937 2 : n.ptype = tableCacheNodeHot
938 2 : c.mu.sizeCold--
939 2 : c.mu.sizeHot++
940 2 : } else {
941 2 : c.clearNode(n)
942 2 : n.ptype = tableCacheNodeTest
943 2 : c.mu.sizeCold--
944 2 : c.mu.sizeTest++
945 2 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
946 1 : c.runHandTest()
947 1 : }
948 : }
949 : }
950 :
951 2 : c.mu.handCold = c.mu.handCold.next()
952 2 :
953 2 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
954 2 : c.runHandHot()
955 2 : }
956 : }
957 :
958 2 : func (c *tableCacheShard) runHandHot() {
959 2 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
960 2 : c.runHandTest()
961 2 : if c.mu.handHot == nil {
962 0 : return
963 0 : }
964 : }
965 :
966 2 : n := c.mu.handHot
967 2 : if n.ptype == tableCacheNodeHot {
968 2 : if n.referenced.Load() {
969 2 : n.referenced.Store(false)
970 2 : } else {
971 2 : n.ptype = tableCacheNodeCold
972 2 : c.mu.sizeHot--
973 2 : c.mu.sizeCold++
974 2 : }
975 : }
976 :
977 2 : c.mu.handHot = c.mu.handHot.next()
978 : }
979 :
980 2 : func (c *tableCacheShard) runHandTest() {
981 2 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
982 2 : c.runHandCold()
983 2 : if c.mu.handTest == nil {
984 0 : return
985 0 : }
986 : }
987 :
988 2 : n := c.mu.handTest
989 2 : if n.ptype == tableCacheNodeTest {
990 2 : c.mu.coldTarget--
991 2 : if c.mu.coldTarget < 0 {
992 1 : c.mu.coldTarget = 0
993 1 : }
994 2 : c.unlinkNode(n)
995 2 : c.clearNode(n)
996 : }
997 :
998 2 : c.mu.handTest = c.mu.handTest.next()
999 : }
1000 :
1001 2 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
1002 2 : c.mu.Lock()
1003 2 : key := tableCacheKey{dbOpts.cacheID, fileNum}
1004 2 : n := c.mu.nodes[key]
1005 2 : var v *tableCacheValue
1006 2 : if n != nil {
1007 2 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
1008 2 : // the tableCacheNode.release() call synchronously below to ensure the
1009 2 : // sstable file descriptor is closed before returning. Note that
1010 2 : // tableCacheShard.releasing needs to be incremented while holding
1011 2 : // tableCacheShard.mu in order to avoid a race with Close()
1012 2 : c.unlinkNode(n)
1013 2 : v = n.value
1014 2 : if v != nil {
1015 2 : if !allowLeak {
1016 2 : if t := v.refCount.Add(-1); t != 0 {
1017 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
1018 0 : }
1019 : }
1020 2 : c.releasing.Add(1)
1021 : }
1022 : }
1023 :
1024 2 : c.mu.Unlock()
1025 2 :
1026 2 : if v != nil {
1027 2 : v.release(c)
1028 2 : }
1029 :
1030 2 : dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
1031 : }
1032 :
1033 : // removeDB evicts any nodes which have a reference to the DB
1034 : // associated with dbOpts.cacheID. Make sure that there will
1035 : // be no more accesses to the files associated with the DB.
1036 2 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
1037 2 : var fileNums []base.DiskFileNum
1038 2 :
1039 2 : c.mu.RLock()
1040 2 : // Collect the fileNums which need to be cleaned.
1041 2 : var firstNode *tableCacheNode
1042 2 : node := c.mu.handHot
1043 2 : for node != firstNode {
1044 2 : if firstNode == nil {
1045 2 : firstNode = node
1046 2 : }
1047 :
1048 2 : if node.cacheID == dbOpts.cacheID {
1049 2 : fileNums = append(fileNums, node.fileNum)
1050 2 : }
1051 2 : node = node.next()
1052 : }
1053 2 : c.mu.RUnlock()
1054 2 :
1055 2 : // Evict all the nodes associated with the DB.
1056 2 : // This should synchronously close all the files
1057 2 : // associated with the DB.
1058 2 : for _, fileNum := range fileNums {
1059 2 : c.evict(fileNum, dbOpts, true)
1060 2 : }
1061 : }
1062 :
1063 2 : func (c *tableCacheShard) Close() error {
1064 2 : c.mu.Lock()
1065 2 : defer c.mu.Unlock()
1066 2 :
1067 2 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1068 2 : // the case that there are leaked iterators.
1069 2 : var err error
1070 2 : if v := c.iterCount.Load(); v > 0 {
1071 1 : if !invariants.RaceEnabled {
1072 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1073 1 : } else {
1074 0 : var buf bytes.Buffer
1075 0 : for _, stack := range c.mu.iters {
1076 0 : fmt.Fprintf(&buf, "%s\n", stack)
1077 0 : }
1078 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1079 : }
1080 : }
1081 :
1082 2 : for c.mu.handHot != nil {
1083 0 : n := c.mu.handHot
1084 0 : if n.value != nil {
1085 0 : if n.value.refCount.Add(-1) == 0 {
1086 0 : c.releasing.Add(1)
1087 0 : c.releasingCh <- n.value
1088 0 : }
1089 : }
1090 0 : c.unlinkNode(n)
1091 : }
1092 2 : c.mu.nodes = nil
1093 2 : c.mu.handHot = nil
1094 2 : c.mu.handCold = nil
1095 2 : c.mu.handTest = nil
1096 2 :
1097 2 : // Only shutdown the releasing goroutine if there were no leaked
1098 2 : // iterators. If there were leaked iterators, we leave the goroutine running
1099 2 : // and the releasingCh open so that a subsequent iterator close can
1100 2 : // complete. This behavior is used by iterator leak tests. Leaking the
1101 2 : // goroutine for these tests is less bad not closing the iterator which
1102 2 : // triggers other warnings about block cache handles not being released.
1103 2 : if err != nil {
1104 1 : c.releasing.Wait()
1105 1 : return err
1106 1 : }
1107 :
1108 2 : close(c.releasingCh)
1109 2 : c.releasing.Wait()
1110 2 : c.releaseLoopExit.Wait()
1111 2 : return err
1112 : }
1113 :
1114 : type tableCacheValue struct {
1115 : closeHook func(i sstable.Iterator) error
1116 : reader *sstable.Reader
1117 : err error
1118 : isShared bool
1119 : loaded chan struct{}
1120 : // Reference count for the value. The reader is closed when the reference
1121 : // count drops to zero.
1122 : refCount atomic.Int32
1123 : }
1124 :
1125 : // loadInfo contains the information needed to populate a new cache entry.
1126 : type loadInfo struct {
1127 : backingFileNum base.DiskFileNum
1128 : }
1129 :
1130 2 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
1131 2 : // Try opening the file first.
1132 2 : var f objstorage.Readable
1133 2 : var err error
1134 2 : f, err = dbOpts.objProvider.OpenForReading(
1135 2 : context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
1136 2 : )
1137 2 : if err == nil {
1138 2 : cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
1139 2 : v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
1140 2 : }
1141 2 : if err == nil {
1142 2 : var objMeta objstorage.ObjectMetadata
1143 2 : objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, loadInfo.backingFileNum)
1144 2 : v.isShared = objMeta.IsShared()
1145 2 : }
1146 2 : if err != nil {
1147 1 : v.err = errors.Wrapf(
1148 1 : err, "pebble: backing file %s error", loadInfo.backingFileNum)
1149 1 : }
1150 2 : if v.err != nil {
1151 1 : c.mu.Lock()
1152 1 : defer c.mu.Unlock()
1153 1 : // Lookup the node in the cache again as it might have already been
1154 1 : // removed.
1155 1 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
1156 1 : n := c.mu.nodes[key]
1157 1 : if n != nil && n.value == v {
1158 1 : c.releaseNode(n)
1159 1 : }
1160 : }
1161 2 : close(v.loaded)
1162 : }
1163 :
1164 2 : func (v *tableCacheValue) release(c *tableCacheShard) {
1165 2 : <-v.loaded
1166 2 : // Nothing to be done about an error at this point. Close the reader if it is
1167 2 : // open.
1168 2 : if v.reader != nil {
1169 2 : _ = v.reader.Close()
1170 2 : }
1171 2 : c.releasing.Done()
1172 : }
1173 :
1174 : type tableCacheNodeType int8
1175 :
1176 : const (
1177 : tableCacheNodeTest tableCacheNodeType = iota
1178 : tableCacheNodeCold
1179 : tableCacheNodeHot
1180 : )
1181 :
1182 0 : func (p tableCacheNodeType) String() string {
1183 0 : switch p {
1184 0 : case tableCacheNodeTest:
1185 0 : return "test"
1186 0 : case tableCacheNodeCold:
1187 0 : return "cold"
1188 0 : case tableCacheNodeHot:
1189 0 : return "hot"
1190 : }
1191 0 : return "unknown"
1192 : }
1193 :
1194 : type tableCacheNode struct {
1195 : fileNum base.DiskFileNum
1196 : value *tableCacheValue
1197 :
1198 : links struct {
1199 : next *tableCacheNode
1200 : prev *tableCacheNode
1201 : }
1202 : ptype tableCacheNodeType
1203 : // referenced is atomically set to indicate that this entry has been accessed
1204 : // since the last time one of the clock hands swept it.
1205 : referenced atomic.Bool
1206 :
1207 : // Storing the cache id associated with the DB instance here
1208 : // avoids the need to thread the dbOpts struct through many functions.
1209 : cacheID uint64
1210 : }
1211 :
1212 2 : func (n *tableCacheNode) next() *tableCacheNode {
1213 2 : if n == nil {
1214 0 : return nil
1215 0 : }
1216 2 : return n.links.next
1217 : }
1218 :
1219 2 : func (n *tableCacheNode) prev() *tableCacheNode {
1220 2 : if n == nil {
1221 0 : return nil
1222 0 : }
1223 2 : return n.links.prev
1224 : }
1225 :
1226 2 : func (n *tableCacheNode) link(s *tableCacheNode) {
1227 2 : s.links.prev = n.links.prev
1228 2 : s.links.prev.links.next = s
1229 2 : s.links.next = n
1230 2 : s.links.next.links.prev = s
1231 2 : }
1232 :
1233 2 : func (n *tableCacheNode) unlink() *tableCacheNode {
1234 2 : next := n.links.next
1235 2 : n.links.prev.links.next = n.links.next
1236 2 : n.links.next.links.prev = n.links.prev
1237 2 : n.links.prev = n
1238 2 : n.links.next = n
1239 2 : return next
1240 2 : }
1241 :
1242 : // iterSet holds a set of iterators of various key kinds, all constructed over
1243 : // the same data structure (eg, an sstable). A subset of the fields may be
1244 : // populated depending on the `iterKinds` passed to newIters.
1245 : type iterSet struct {
1246 : point internalIterator
1247 : rangeDeletion keyspan.FragmentIterator
1248 : rangeKey keyspan.FragmentIterator
1249 : }
1250 :
1251 : // TODO(jackson): Consider adding methods for fast paths that check whether an
1252 : // iterator of a particular kind is nil, so that these call sites don't need to
1253 : // reach into the struct's fields directly.
1254 :
1255 : // Point returns the contained point iterator. If there is no point iterator,
1256 : // Point returns a non-nil empty point iterator.
1257 2 : func (s *iterSet) Point() internalIterator {
1258 2 : if s.point == nil {
1259 2 : return emptyIter
1260 2 : }
1261 2 : return s.point
1262 : }
1263 :
1264 : // RangeDeletion returns the contained range deletion iterator. If there is no
1265 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
1266 : // iterator.
1267 2 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
1268 2 : if s.rangeDeletion == nil {
1269 2 : return emptyKeyspanIter
1270 2 : }
1271 2 : return s.rangeDeletion
1272 : }
1273 :
1274 : // RangeKey returns the contained range key iterator. If there is no range key
1275 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
1276 2 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
1277 2 : if s.rangeKey == nil {
1278 2 : return emptyKeyspanIter
1279 2 : }
1280 2 : return s.rangeKey
1281 : }
1282 :
1283 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
1284 : // must be not be called on the constituent iterators.
1285 1 : func (s *iterSet) CloseAll() error {
1286 1 : var err error
1287 1 : if s.point != nil {
1288 1 : err = s.point.Close()
1289 1 : }
1290 1 : if s.rangeDeletion != nil {
1291 1 : err = firstError(err, s.rangeDeletion.Close())
1292 1 : }
1293 1 : if s.rangeKey != nil {
1294 0 : err = firstError(err, s.rangeKey.Close())
1295 0 : }
1296 1 : return err
1297 : }
1298 :
1299 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
1300 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
1301 : // represent a set of desired iterator kinds.
1302 : type iterKinds uint8
1303 :
1304 2 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
1305 2 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
1306 2 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
1307 :
1308 : const (
1309 : iterPointKeys iterKinds = 1 << iota
1310 : iterRangeDeletions
1311 : iterRangeKeys
1312 : )
|