Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
23 : "github.com/cockroachdb/pebble/internal/manifest"
24 : "github.com/cockroachdb/pebble/internal/private"
25 : "github.com/cockroachdb/pebble/objstorage"
26 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
27 : "github.com/cockroachdb/pebble/sstable"
28 : )
29 :
30 : var emptyIter = &errorIter{err: nil}
31 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
32 :
33 : // tableNewIters creates new iterators (point, range deletion and/or range key)
34 : // for the given file metadata. Which of the various iterator kinds the user is
35 : // requesting is specified with the iterKinds bitmap.
36 : //
37 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
38 : // populated with iterators.
39 : //
40 : // If a point iterator is requested and the operation was successful,
41 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
42 : // finished.
43 : //
44 : // If a range deletion or range key iterator is requested, the corresponding
45 : // iterator may be nil if the table does not contain any keys of the
46 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
47 : // RangeKey() convenience methods that return non-nil empty iterators that may
48 : // be used if the caller requires a non-nil iterator.
49 : //
50 : // On error, all iterators are nil.
51 : //
52 : // The only (non-test) implementation of tableNewIters is
53 : // tableCacheContainer.newIters().
54 : type tableNewIters func(
55 : ctx context.Context,
56 : file *manifest.FileMetadata,
57 : opts *IterOptions,
58 : internalOpts internalIterOpts,
59 : kinds iterKinds,
60 : ) (iterSet, error)
61 :
62 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
63 : // for the rangedel iterator returned by tableNewIters.
64 : func tableNewRangeDelIter(
65 : ctx context.Context, newIters tableNewIters,
66 1 : ) keyspanimpl.TableNewSpanIter {
67 1 : return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
68 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
69 1 : if err != nil {
70 0 : return nil, err
71 0 : }
72 1 : return iters.RangeDeletion(), nil
73 : }
74 : }
75 :
76 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
77 : // for the range key iterator returned by tableNewIters.
78 : func tableNewRangeKeyIter(
79 : ctx context.Context, newIters tableNewIters,
80 1 : ) keyspanimpl.TableNewSpanIter {
81 1 : return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
82 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
83 1 : if err != nil {
84 0 : return nil, err
85 0 : }
86 1 : return iters.RangeKey(), nil
87 : }
88 : }
89 :
90 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
91 :
92 : // tableCacheOpts contains the db specific fields
93 : // of a table cache. This is stored in the tableCacheContainer
94 : // along with the table cache.
95 : // NB: It is important to make sure that the fields in this
96 : // struct are read-only. Since the fields here are shared
97 : // by every single tableCacheShard, if non read-only fields
98 : // are updated, we could have unnecessary evictions of those
99 : // fields, and the surrounding fields from the CPU caches.
100 : type tableCacheOpts struct {
101 : // iterCount keeps track of how many iterators are open. It is used to keep
102 : // track of leaked iterators on a per-db level.
103 : iterCount *atomic.Int32
104 :
105 : loggerAndTracer LoggerAndTracer
106 : cacheID uint64
107 : objProvider objstorage.Provider
108 : opts sstable.ReaderOptions
109 : filterMetrics *sstable.FilterMetricsTracker
110 : sstStatsCollector *sstable.CategoryStatsCollector
111 : }
112 :
113 : // tableCacheContainer contains the table cache and
114 : // fields which are unique to the DB.
115 : type tableCacheContainer struct {
116 : tableCache *TableCache
117 :
118 : // dbOpts contains fields relevant to the table cache
119 : // which are unique to each DB.
120 : dbOpts tableCacheOpts
121 : }
122 :
123 : // newTableCacheContainer will panic if the underlying cache in the table cache
124 : // doesn't match Options.Cache.
125 : func newTableCacheContainer(
126 : tc *TableCache,
127 : cacheID uint64,
128 : objProvider objstorage.Provider,
129 : opts *Options,
130 : size int,
131 : sstStatsCollector *sstable.CategoryStatsCollector,
132 1 : ) *tableCacheContainer {
133 1 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
134 1 : if tc != nil {
135 0 : if tc.cache != opts.Cache {
136 0 : panic("pebble: underlying cache for the table cache and db are different")
137 : }
138 0 : tc.Ref()
139 1 : } else {
140 1 : // NewTableCache should create a ref to tc which the container should
141 1 : // drop whenever it is closed.
142 1 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
143 1 : }
144 :
145 1 : t := &tableCacheContainer{}
146 1 : t.tableCache = tc
147 1 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
148 1 : t.dbOpts.cacheID = cacheID
149 1 : t.dbOpts.objProvider = objProvider
150 1 : t.dbOpts.opts = opts.MakeReaderOptions()
151 1 : t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
152 1 : t.dbOpts.iterCount = new(atomic.Int32)
153 1 : t.dbOpts.sstStatsCollector = sstStatsCollector
154 1 : return t
155 : }
156 :
157 : // Before calling close, make sure that there will be no further need
158 : // to access any of the files associated with the store.
159 1 : func (c *tableCacheContainer) close() error {
160 1 : // We want to do some cleanup work here. Check for leaked iterators
161 1 : // by the DB using this container. Note that we'll still perform cleanup
162 1 : // below in the case that there are leaked iterators.
163 1 : var err error
164 1 : if v := c.dbOpts.iterCount.Load(); v > 0 {
165 0 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
166 0 : }
167 :
168 : // Release nodes here.
169 1 : for _, shard := range c.tableCache.shards {
170 1 : if shard != nil {
171 1 : shard.removeDB(&c.dbOpts)
172 1 : }
173 : }
174 1 : return firstError(err, c.tableCache.Unref())
175 : }
176 :
177 : func (c *tableCacheContainer) newIters(
178 : ctx context.Context,
179 : file *manifest.FileMetadata,
180 : opts *IterOptions,
181 : internalOpts internalIterOpts,
182 : kinds iterKinds,
183 1 : ) (iterSet, error) {
184 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
185 1 : }
186 :
187 : // getTableProperties returns the properties associated with the backing physical
188 : // table if the input metadata belongs to a virtual sstable.
189 0 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
190 0 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
191 0 : }
192 :
193 1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
194 1 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
195 1 : }
196 :
197 1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
198 1 : var m CacheMetrics
199 1 : for i := range c.tableCache.shards {
200 1 : s := c.tableCache.shards[i]
201 1 : s.mu.RLock()
202 1 : m.Count += int64(len(s.mu.nodes))
203 1 : s.mu.RUnlock()
204 1 : m.Hits += s.hits.Load()
205 1 : m.Misses += s.misses.Load()
206 1 : }
207 1 : m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
208 1 : f := c.dbOpts.filterMetrics.Load()
209 1 : return m, f
210 : }
211 :
212 : func (c *tableCacheContainer) estimateSize(
213 : meta *fileMetadata, lower, upper []byte,
214 1 : ) (size uint64, err error) {
215 1 : if meta.Virtual {
216 1 : err = c.withVirtualReader(
217 1 : meta.VirtualMeta(),
218 1 : func(r sstable.VirtualReader) (err error) {
219 1 : size, err = r.EstimateDiskUsage(lower, upper)
220 1 : return err
221 1 : },
222 : )
223 1 : } else {
224 1 : err = c.withReader(
225 1 : meta.PhysicalMeta(),
226 1 : func(r *sstable.Reader) (err error) {
227 1 : size, err = r.EstimateDiskUsage(lower, upper)
228 1 : return err
229 1 : },
230 : )
231 : }
232 1 : if err != nil {
233 0 : return 0, err
234 0 : }
235 1 : return size, nil
236 : }
237 :
238 : // createCommonReader creates a Reader for this file.
239 1 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
240 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
241 1 : var cr sstable.CommonReader = v.reader
242 1 : if file.Virtual {
243 1 : virtualReader := sstable.MakeVirtualReader(
244 1 : v.reader, file.VirtualMeta().VirtualReaderParams(v.isShared),
245 1 : )
246 1 : cr = &virtualReader
247 1 : }
248 1 : return cr
249 : }
250 :
251 : func (c *tableCacheContainer) withCommonReader(
252 : meta *fileMetadata, fn func(sstable.CommonReader) error,
253 1 : ) error {
254 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
255 1 : v := s.findNode(meta.FileBacking, &c.dbOpts)
256 1 : defer s.unrefValue(v)
257 1 : if v.err != nil {
258 0 : return v.err
259 0 : }
260 1 : return fn(createCommonReader(v, meta))
261 : }
262 :
263 1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
264 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
265 1 : v := s.findNode(meta.FileBacking, &c.dbOpts)
266 1 : defer s.unrefValue(v)
267 1 : if v.err != nil {
268 0 : return v.err
269 0 : }
270 1 : return fn(v.reader)
271 : }
272 :
273 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
274 : func (c *tableCacheContainer) withVirtualReader(
275 : meta virtualMeta, fn func(sstable.VirtualReader) error,
276 1 : ) error {
277 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
278 1 : v := s.findNode(meta.FileBacking, &c.dbOpts)
279 1 : defer s.unrefValue(v)
280 1 : if v.err != nil {
281 0 : return v.err
282 0 : }
283 1 : provider := c.dbOpts.objProvider
284 1 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
285 1 : if err != nil {
286 0 : return err
287 0 : }
288 1 : return fn(sstable.MakeVirtualReader(v.reader, meta.VirtualReaderParams(objMeta.IsShared())))
289 : }
290 :
291 1 : func (c *tableCacheContainer) iterCount() int64 {
292 1 : return int64(c.dbOpts.iterCount.Load())
293 1 : }
294 :
295 : // TableCache is a shareable cache for open sstables.
296 : type TableCache struct {
297 : refs atomic.Int64
298 :
299 : cache *Cache
300 : shards []*tableCacheShard
301 : }
302 :
303 : // Ref adds a reference to the table cache. Once tableCache.init returns,
304 : // the table cache only remains valid if there is at least one reference
305 : // to it.
306 0 : func (c *TableCache) Ref() {
307 0 : v := c.refs.Add(1)
308 0 : // We don't want the reference count to ever go from 0 -> 1,
309 0 : // cause a reference count of 0 implies that we've closed the cache.
310 0 : if v <= 1 {
311 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
312 : }
313 : }
314 :
315 : // Unref removes a reference to the table cache.
316 1 : func (c *TableCache) Unref() error {
317 1 : v := c.refs.Add(-1)
318 1 : switch {
319 0 : case v < 0:
320 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
321 1 : case v == 0:
322 1 : var err error
323 1 : for i := range c.shards {
324 1 : // The cache shard is not allocated yet, nothing to close
325 1 : if c.shards[i] == nil {
326 0 : continue
327 : }
328 1 : err = firstError(err, c.shards[i].Close())
329 : }
330 :
331 : // Unref the cache which we create a reference to when the tableCache
332 : // is first instantiated.
333 1 : c.cache.Unref()
334 1 : return err
335 : }
336 0 : return nil
337 : }
338 :
339 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
340 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
341 1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
342 1 : if size == 0 {
343 0 : panic("pebble: cannot create a table cache of size 0")
344 1 : } else if numShards == 0 {
345 0 : panic("pebble: cannot create a table cache with 0 shards")
346 : }
347 :
348 1 : c := &TableCache{}
349 1 : c.cache = cache
350 1 : c.cache.Ref()
351 1 :
352 1 : c.shards = make([]*tableCacheShard, numShards)
353 1 : for i := range c.shards {
354 1 : c.shards[i] = &tableCacheShard{}
355 1 : c.shards[i].init(size / len(c.shards))
356 1 : }
357 :
358 : // Hold a ref to the cache here.
359 1 : c.refs.Store(1)
360 1 :
361 1 : return c
362 : }
363 :
364 1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
365 1 : return c.shards[uint64(fileNum)%uint64(len(c.shards))]
366 1 : }
367 :
368 : type tableCacheKey struct {
369 : cacheID uint64
370 : fileNum base.DiskFileNum
371 : }
372 :
373 : type tableCacheShard struct {
374 : hits atomic.Int64
375 : misses atomic.Int64
376 : iterCount atomic.Int32
377 :
378 : size int
379 :
380 : mu struct {
381 : sync.RWMutex
382 : nodes map[tableCacheKey]*tableCacheNode
383 : // The iters map is only created and populated in race builds.
384 : iters map[io.Closer][]byte
385 :
386 : handHot *tableCacheNode
387 : handCold *tableCacheNode
388 : handTest *tableCacheNode
389 :
390 : coldTarget int
391 : sizeHot int
392 : sizeCold int
393 : sizeTest int
394 : }
395 : releasing sync.WaitGroup
396 : releasingCh chan *tableCacheValue
397 : releaseLoopExit sync.WaitGroup
398 : }
399 :
400 1 : func (c *tableCacheShard) init(size int) {
401 1 : c.size = size
402 1 :
403 1 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
404 1 : c.mu.coldTarget = size
405 1 : c.releasingCh = make(chan *tableCacheValue, 100)
406 1 : c.releaseLoopExit.Add(1)
407 1 : go c.releaseLoop()
408 1 :
409 1 : if invariants.RaceEnabled {
410 0 : c.mu.iters = make(map[io.Closer][]byte)
411 0 : }
412 : }
413 :
414 1 : func (c *tableCacheShard) releaseLoop() {
415 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
416 1 : defer c.releaseLoopExit.Done()
417 1 : for v := range c.releasingCh {
418 1 : v.release(c)
419 1 : }
420 : })
421 : }
422 :
423 : // checkAndIntersectFilters checks the specific table and block property filters
424 : // for intersection with any available table and block-level properties. Returns
425 : // true for ok if this table should be read by this iterator.
426 : func (c *tableCacheShard) checkAndIntersectFilters(
427 : v *tableCacheValue,
428 : tableFilter func(userProps map[string]string) bool,
429 : blockPropertyFilters []BlockPropertyFilter,
430 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
431 : syntheticSuffix sstable.SyntheticSuffix,
432 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
433 1 : if tableFilter != nil &&
434 1 : !tableFilter(v.reader.Properties.UserProperties) {
435 0 : return false, nil, nil
436 0 : }
437 :
438 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
439 1 : filterer, err = sstable.IntersectsTable(
440 1 : blockPropertyFilters,
441 1 : boundLimitedFilter,
442 1 : v.reader.Properties.UserProperties,
443 1 : syntheticSuffix,
444 1 : )
445 1 : // NB: IntersectsTable will return a nil filterer if the table-level
446 1 : // properties indicate there's no intersection with the provided filters.
447 1 : if filterer == nil || err != nil {
448 1 : return false, nil, err
449 1 : }
450 : }
451 1 : return true, filterer, nil
452 : }
453 :
454 : func (c *tableCacheShard) newIters(
455 : ctx context.Context,
456 : file *manifest.FileMetadata,
457 : opts *IterOptions,
458 : internalOpts internalIterOpts,
459 : dbOpts *tableCacheOpts,
460 : kinds iterKinds,
461 1 : ) (iterSet, error) {
462 1 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
463 1 : // since parts of the sstable are read during the construction. The Reader
464 1 : // should not remember that context since the Reader can be long-lived.
465 1 :
466 1 : // Calling findNode gives us the responsibility of decrementing v's
467 1 : // refCount. If opening the underlying table resulted in error, then we
468 1 : // decrement this straight away. Otherwise, we pass that responsibility to
469 1 : // the sstable iterator, which decrements when it is closed.
470 1 : v := c.findNode(file.FileBacking, dbOpts)
471 1 : if v.err != nil {
472 0 : defer c.unrefValue(v)
473 0 : return iterSet{}, v.err
474 0 : }
475 :
476 : // Note: This suffers an allocation for virtual sstables.
477 1 : cr := createCommonReader(v, file)
478 1 : var iters iterSet
479 1 : var err error
480 1 : if kinds.RangeKey() && file.HasRangeKeys {
481 1 : iters.rangeKey, err = c.newRangeKeyIter(v, file, cr, opts.SpanIterOptions())
482 1 : }
483 1 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
484 1 : iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
485 1 : }
486 1 : if kinds.Point() && err == nil {
487 1 : iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
488 1 : }
489 1 : if err != nil {
490 0 : // NB: There's a subtlety here: Because the point iterator is the last
491 0 : // iterator we attempt to create, it's not possible for:
492 0 : // err != nil && iters.point != nil
493 0 : // If it were possible, we'd need to account for it to avoid double
494 0 : // unref-ing here, once during CloseAll and once during `unrefValue`.
495 0 : iters.CloseAll()
496 0 : c.unrefValue(v)
497 0 : return iterSet{}, err
498 0 : }
499 : // Only point iterators ever require the reader stay pinned in the cache. If
500 : // we're not returning a point iterator to the caller, we need to unref v.
501 1 : if iters.point == nil {
502 1 : c.unrefValue(v)
503 1 : }
504 1 : return iters, nil
505 : }
506 :
507 : // newPointIter is an internal helper that constructs a point iterator over a
508 : // sstable. This function is for internal use only, and callers should use
509 : // newIters instead.
510 : func (c *tableCacheShard) newPointIter(
511 : ctx context.Context,
512 : v *tableCacheValue,
513 : file *manifest.FileMetadata,
514 : cr sstable.CommonReader,
515 : opts *IterOptions,
516 : internalOpts internalIterOpts,
517 : dbOpts *tableCacheOpts,
518 1 : ) (internalIterator, error) {
519 1 : var (
520 1 : hideObsoletePoints bool = false
521 1 : pointKeyFilters []BlockPropertyFilter
522 1 : filterer *sstable.BlockPropertiesFilterer
523 1 : )
524 1 : if opts != nil {
525 1 : // This code is appending (at most one filter) in-place to
526 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
527 1 : // the same iterator tree. This is acceptable since all the following
528 1 : // properties are true:
529 1 : // - The iterator tree is single threaded, so the shared backing for the
530 1 : // slice is being mutated in a single threaded manner.
531 1 : // - Each shallow copy of the slice has its own notion of length.
532 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
533 1 : // struct, which is stateless, so overwriting that struct when creating
534 1 : // one sstable iterator is harmless to other sstable iterators that are
535 1 : // relying on that struct.
536 1 : //
537 1 : // An alternative would be to have different slices for different sstable
538 1 : // iterators, but that requires more work to avoid allocations.
539 1 : //
540 1 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
541 1 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
542 1 : // apply the obsolete block property filter. We could optimize this by
543 1 : // applying the filter.
544 1 : hideObsoletePoints, pointKeyFilters =
545 1 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
546 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
547 1 :
548 1 : var ok bool
549 1 : var err error
550 1 : ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
551 1 : pointKeyFilters, internalOpts.boundLimitedFilter, file.SyntheticSuffix)
552 1 : if err != nil {
553 0 : return nil, err
554 1 : } else if !ok {
555 1 : // No point keys within the table match the filters.
556 1 : return nil, nil
557 1 : }
558 : }
559 :
560 1 : var iter sstable.Iterator
561 1 : useFilter := true
562 1 : if opts != nil {
563 1 : useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
564 1 : ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
565 1 : }
566 1 : tableFormat, err := v.reader.TableFormat()
567 1 : if err != nil {
568 0 : return nil, err
569 0 : }
570 1 : var rp sstable.ReaderProvider
571 1 : if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
572 1 : rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
573 1 : }
574 :
575 1 : if v.isShared && file.SyntheticSeqNum() != 0 {
576 1 : if tableFormat < sstable.TableFormatPebblev4 {
577 0 : return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
578 0 : }
579 : // The table is shared and ingested.
580 1 : hideObsoletePoints = true
581 : }
582 1 : transforms := file.IterTransforms()
583 1 : transforms.HideObsoletePoints = hideObsoletePoints
584 1 : var categoryAndQoS sstable.CategoryAndQoS
585 1 : if opts != nil {
586 1 : categoryAndQoS = opts.CategoryAndQoS
587 1 : }
588 1 : if internalOpts.compaction {
589 1 : iter, err = cr.NewCompactionIter(
590 1 : transforms, categoryAndQoS, dbOpts.sstStatsCollector, rp,
591 1 : internalOpts.bufferPool)
592 1 : } else {
593 1 : iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
594 1 : ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, useFilter,
595 1 : internalOpts.stats, categoryAndQoS, dbOpts.sstStatsCollector, rp)
596 1 : }
597 1 : if err != nil {
598 0 : return nil, err
599 0 : }
600 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
601 : // care to avoid introducing an allocation here by adding a closure.
602 1 : iter.SetCloseHook(v.closeHook)
603 1 : c.iterCount.Add(1)
604 1 : dbOpts.iterCount.Add(1)
605 1 : if invariants.RaceEnabled {
606 0 : c.mu.Lock()
607 0 : c.mu.iters[iter] = debug.Stack()
608 0 : c.mu.Unlock()
609 0 : }
610 1 : return iter, nil
611 : }
612 :
613 : // newRangeDelIter is an internal helper that constructs an iterator over a
614 : // sstable's range deletions. This function is for table-cache internal use
615 : // only, and callers should use newIters instead.
616 : func (c *tableCacheShard) newRangeDelIter(
617 : ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *tableCacheOpts,
618 1 : ) (keyspan.FragmentIterator, error) {
619 1 : // NB: range-del iterator does not maintain a reference to the table, nor
620 1 : // does it need to read from it after creation.
621 1 : rangeDelIter, err := cr.NewRawRangeDelIter(file.IterTransforms())
622 1 : if err != nil {
623 0 : return nil, err
624 0 : }
625 : // Assert expected bounds in tests.
626 1 : if invariants.Enabled && rangeDelIter != nil {
627 1 : cmp := base.DefaultComparer.Compare
628 1 : if dbOpts.opts.Comparer != nil {
629 1 : cmp = dbOpts.opts.Comparer.Compare
630 1 : }
631 1 : rangeDelIter = keyspan.AssertBounds(
632 1 : rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
633 1 : )
634 : }
635 1 : return rangeDelIter, nil
636 : }
637 :
638 : // newRangeKeyIter is an internal helper that constructs an iterator over a
639 : // sstable's range keys. This function is for table-cache internal use only, and
640 : // callers should use newIters instead.
641 : func (c *tableCacheShard) newRangeKeyIter(
642 : v *tableCacheValue, file *fileMetadata, cr sstable.CommonReader, opts keyspan.SpanIterOptions,
643 1 : ) (keyspan.FragmentIterator, error) {
644 1 : transforms := file.IterTransforms()
645 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
646 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
647 1 : // file's range key blocks may surface deleted range keys below. This is
648 1 : // done here, rather than deferring to the block-property collector in order
649 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
650 1 : if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
651 0 : ok, _, err := c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix)
652 0 : if err != nil {
653 0 : return nil, err
654 0 : } else if !ok {
655 0 : return nil, nil
656 0 : }
657 : }
658 : // TODO(radu): wrap in an AssertBounds.
659 1 : return cr.NewRawRangeKeyIter(transforms)
660 : }
661 :
662 : type tableCacheShardReaderProvider struct {
663 : c *tableCacheShard
664 : file *manifest.FileMetadata
665 : dbOpts *tableCacheOpts
666 : v *tableCacheValue
667 : }
668 :
669 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
670 :
671 : // GetReader implements sstable.ReaderProvider. Note that it is not the
672 : // responsibility of tableCacheShardReaderProvider to ensure that the file
673 : // continues to exist. The ReaderProvider is used in iterators where the
674 : // top-level iterator is pinning the read state and preventing the files from
675 : // being deleted.
676 : //
677 : // The caller must call tableCacheShardReaderProvider.Close.
678 : //
679 : // Note that currently the Reader returned here is only used to read value
680 : // blocks. This reader shouldn't be used for other purposes like reading keys
681 : // outside of virtual sstable bounds.
682 : //
683 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
684 : // that the reader isn't used for other purposes.
685 1 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
686 1 : // Calling findNode gives us the responsibility of decrementing v's
687 1 : // refCount.
688 1 : v := rp.c.findNode(rp.file.FileBacking, rp.dbOpts)
689 1 : if v.err != nil {
690 0 : defer rp.c.unrefValue(v)
691 0 : return nil, v.err
692 0 : }
693 1 : rp.v = v
694 1 : return v.reader, nil
695 : }
696 :
697 : // Close implements sstable.ReaderProvider.
698 1 : func (rp *tableCacheShardReaderProvider) Close() {
699 1 : rp.c.unrefValue(rp.v)
700 1 : rp.v = nil
701 1 : }
702 :
703 : // getTableProperties return sst table properties for target file
704 : func (c *tableCacheShard) getTableProperties(
705 : file *fileMetadata, dbOpts *tableCacheOpts,
706 0 : ) (*sstable.Properties, error) {
707 0 : // Calling findNode gives us the responsibility of decrementing v's refCount here
708 0 : v := c.findNode(file.FileBacking, dbOpts)
709 0 : defer c.unrefValue(v)
710 0 :
711 0 : if v.err != nil {
712 0 : return nil, v.err
713 0 : }
714 0 : return &v.reader.Properties, nil
715 : }
716 :
717 : // releaseNode releases a node from the tableCacheShard.
718 : //
719 : // c.mu must be held when calling this.
720 0 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
721 0 : c.unlinkNode(n)
722 0 : c.clearNode(n)
723 0 : }
724 :
725 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
726 : // reference in place.
727 : //
728 : // c.mu must be held when calling this.
729 1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
730 1 : key := tableCacheKey{n.cacheID, n.fileNum}
731 1 : delete(c.mu.nodes, key)
732 1 :
733 1 : switch n.ptype {
734 1 : case tableCacheNodeHot:
735 1 : c.mu.sizeHot--
736 1 : case tableCacheNodeCold:
737 1 : c.mu.sizeCold--
738 1 : case tableCacheNodeTest:
739 1 : c.mu.sizeTest--
740 : }
741 :
742 1 : if n == c.mu.handHot {
743 1 : c.mu.handHot = c.mu.handHot.prev()
744 1 : }
745 1 : if n == c.mu.handCold {
746 1 : c.mu.handCold = c.mu.handCold.prev()
747 1 : }
748 1 : if n == c.mu.handTest {
749 1 : c.mu.handTest = c.mu.handTest.prev()
750 1 : }
751 :
752 1 : if n.unlink() == n {
753 1 : // This was the last entry in the cache.
754 1 : c.mu.handHot = nil
755 1 : c.mu.handCold = nil
756 1 : c.mu.handTest = nil
757 1 : }
758 :
759 1 : n.links.prev = nil
760 1 : n.links.next = nil
761 : }
762 :
763 1 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
764 1 : if v := n.value; v != nil {
765 1 : n.value = nil
766 1 : c.unrefValue(v)
767 1 : }
768 : }
769 :
770 : // unrefValue decrements the reference count for the specified value, releasing
771 : // it if the reference count fell to 0. Note that the value has a reference if
772 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
773 : // the node has already been removed from that map.
774 1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
775 1 : if v.refCount.Add(-1) == 0 {
776 1 : c.releasing.Add(1)
777 1 : c.releasingCh <- v
778 1 : }
779 : }
780 :
781 : // findNode returns the node for the table with the given file number, creating
782 : // that node if it didn't already exist. The caller is responsible for
783 : // decrementing the returned node's refCount.
784 1 : func (c *tableCacheShard) findNode(b *fileBacking, dbOpts *tableCacheOpts) *tableCacheValue {
785 1 : // The backing must have a positive refcount (otherwise it could be deleted at any time).
786 1 : b.MustHaveRefs()
787 1 : // Caution! Here fileMetadata can be a physical or virtual table. Table cache
788 1 : // readers are associated with the physical backings. All virtual tables with
789 1 : // the same backing will use the same reader from the cache; so no information
790 1 : // that can differ among these virtual tables can be plumbed into loadInfo.
791 1 : info := loadInfo{
792 1 : backingFileNum: b.DiskFileNum,
793 1 : }
794 1 :
795 1 : return c.findNodeInternal(info, dbOpts)
796 1 : }
797 :
798 : func (c *tableCacheShard) findNodeInternal(
799 : loadInfo loadInfo, dbOpts *tableCacheOpts,
800 1 : ) *tableCacheValue {
801 1 : // Fast-path for a hit in the cache.
802 1 : c.mu.RLock()
803 1 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
804 1 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
805 1 : // Fast-path hit.
806 1 : //
807 1 : // The caller is responsible for decrementing the refCount.
808 1 : v := n.value
809 1 : v.refCount.Add(1)
810 1 : c.mu.RUnlock()
811 1 : n.referenced.Store(true)
812 1 : c.hits.Add(1)
813 1 : <-v.loaded
814 1 : return v
815 1 : }
816 1 : c.mu.RUnlock()
817 1 :
818 1 : c.mu.Lock()
819 1 :
820 1 : n := c.mu.nodes[key]
821 1 : switch {
822 1 : case n == nil:
823 1 : // Slow-path miss of a non-existent node.
824 1 : n = &tableCacheNode{
825 1 : fileNum: loadInfo.backingFileNum,
826 1 : ptype: tableCacheNodeCold,
827 1 : }
828 1 : c.addNode(n, dbOpts)
829 1 : c.mu.sizeCold++
830 :
831 1 : case n.value != nil:
832 1 : // Slow-path hit of a hot or cold node.
833 1 : //
834 1 : // The caller is responsible for decrementing the refCount.
835 1 : v := n.value
836 1 : v.refCount.Add(1)
837 1 : n.referenced.Store(true)
838 1 : c.hits.Add(1)
839 1 : c.mu.Unlock()
840 1 : <-v.loaded
841 1 : return v
842 :
843 1 : default:
844 1 : // Slow-path miss of a test node.
845 1 : c.unlinkNode(n)
846 1 : c.mu.coldTarget++
847 1 : if c.mu.coldTarget > c.size {
848 0 : c.mu.coldTarget = c.size
849 0 : }
850 :
851 1 : n.referenced.Store(false)
852 1 : n.ptype = tableCacheNodeHot
853 1 : c.addNode(n, dbOpts)
854 1 : c.mu.sizeHot++
855 : }
856 :
857 1 : c.misses.Add(1)
858 1 :
859 1 : v := &tableCacheValue{
860 1 : loaded: make(chan struct{}),
861 1 : }
862 1 : v.refCount.Store(2)
863 1 : // Cache the closure invoked when an iterator is closed. This avoids an
864 1 : // allocation on every call to newIters.
865 1 : v.closeHook = func(i sstable.Iterator) error {
866 1 : if invariants.RaceEnabled {
867 0 : c.mu.Lock()
868 0 : delete(c.mu.iters, i)
869 0 : c.mu.Unlock()
870 0 : }
871 1 : c.unrefValue(v)
872 1 : c.iterCount.Add(-1)
873 1 : dbOpts.iterCount.Add(-1)
874 1 : return nil
875 : }
876 1 : n.value = v
877 1 :
878 1 : c.mu.Unlock()
879 1 :
880 1 : // Note adding to the cache lists must complete before we begin loading the
881 1 : // table as a failure during load will result in the node being unlinked.
882 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
883 1 : v.load(loadInfo, c, dbOpts)
884 1 : })
885 1 : return v
886 : }
887 :
888 1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
889 1 : c.evictNodes()
890 1 : n.cacheID = dbOpts.cacheID
891 1 : key := tableCacheKey{n.cacheID, n.fileNum}
892 1 : c.mu.nodes[key] = n
893 1 :
894 1 : n.links.next = n
895 1 : n.links.prev = n
896 1 : if c.mu.handHot == nil {
897 1 : // First element.
898 1 : c.mu.handHot = n
899 1 : c.mu.handCold = n
900 1 : c.mu.handTest = n
901 1 : } else {
902 1 : c.mu.handHot.link(n)
903 1 : }
904 :
905 1 : if c.mu.handCold == c.mu.handHot {
906 1 : c.mu.handCold = c.mu.handCold.prev()
907 1 : }
908 : }
909 :
910 1 : func (c *tableCacheShard) evictNodes() {
911 1 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
912 1 : c.runHandCold()
913 1 : }
914 : }
915 :
916 1 : func (c *tableCacheShard) runHandCold() {
917 1 : n := c.mu.handCold
918 1 : if n.ptype == tableCacheNodeCold {
919 1 : if n.referenced.Load() {
920 1 : n.referenced.Store(false)
921 1 : n.ptype = tableCacheNodeHot
922 1 : c.mu.sizeCold--
923 1 : c.mu.sizeHot++
924 1 : } else {
925 1 : c.clearNode(n)
926 1 : n.ptype = tableCacheNodeTest
927 1 : c.mu.sizeCold--
928 1 : c.mu.sizeTest++
929 1 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
930 0 : c.runHandTest()
931 0 : }
932 : }
933 : }
934 :
935 1 : c.mu.handCold = c.mu.handCold.next()
936 1 :
937 1 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
938 1 : c.runHandHot()
939 1 : }
940 : }
941 :
942 1 : func (c *tableCacheShard) runHandHot() {
943 1 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
944 1 : c.runHandTest()
945 1 : if c.mu.handHot == nil {
946 0 : return
947 0 : }
948 : }
949 :
950 1 : n := c.mu.handHot
951 1 : if n.ptype == tableCacheNodeHot {
952 1 : if n.referenced.Load() {
953 1 : n.referenced.Store(false)
954 1 : } else {
955 1 : n.ptype = tableCacheNodeCold
956 1 : c.mu.sizeHot--
957 1 : c.mu.sizeCold++
958 1 : }
959 : }
960 :
961 1 : c.mu.handHot = c.mu.handHot.next()
962 : }
963 :
964 1 : func (c *tableCacheShard) runHandTest() {
965 1 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
966 1 : c.runHandCold()
967 1 : if c.mu.handTest == nil {
968 0 : return
969 0 : }
970 : }
971 :
972 1 : n := c.mu.handTest
973 1 : if n.ptype == tableCacheNodeTest {
974 1 : c.mu.coldTarget--
975 1 : if c.mu.coldTarget < 0 {
976 0 : c.mu.coldTarget = 0
977 0 : }
978 1 : c.unlinkNode(n)
979 1 : c.clearNode(n)
980 : }
981 :
982 1 : c.mu.handTest = c.mu.handTest.next()
983 : }
984 :
985 1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
986 1 : c.mu.Lock()
987 1 : key := tableCacheKey{dbOpts.cacheID, fileNum}
988 1 : n := c.mu.nodes[key]
989 1 : var v *tableCacheValue
990 1 : if n != nil {
991 1 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
992 1 : // the tableCacheNode.release() call synchronously below to ensure the
993 1 : // sstable file descriptor is closed before returning. Note that
994 1 : // tableCacheShard.releasing needs to be incremented while holding
995 1 : // tableCacheShard.mu in order to avoid a race with Close()
996 1 : c.unlinkNode(n)
997 1 : v = n.value
998 1 : if v != nil {
999 1 : if !allowLeak {
1000 1 : if t := v.refCount.Add(-1); t != 0 {
1001 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
1002 0 : }
1003 : }
1004 1 : c.releasing.Add(1)
1005 : }
1006 : }
1007 :
1008 1 : c.mu.Unlock()
1009 1 :
1010 1 : if v != nil {
1011 1 : v.release(c)
1012 1 : }
1013 :
1014 1 : dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
1015 : }
1016 :
1017 : // removeDB evicts any nodes which have a reference to the DB
1018 : // associated with dbOpts.cacheID. Make sure that there will
1019 : // be no more accesses to the files associated with the DB.
1020 1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
1021 1 : var fileNums []base.DiskFileNum
1022 1 :
1023 1 : c.mu.RLock()
1024 1 : // Collect the fileNums which need to be cleaned.
1025 1 : var firstNode *tableCacheNode
1026 1 : node := c.mu.handHot
1027 1 : for node != firstNode {
1028 1 : if firstNode == nil {
1029 1 : firstNode = node
1030 1 : }
1031 :
1032 1 : if node.cacheID == dbOpts.cacheID {
1033 1 : fileNums = append(fileNums, node.fileNum)
1034 1 : }
1035 1 : node = node.next()
1036 : }
1037 1 : c.mu.RUnlock()
1038 1 :
1039 1 : // Evict all the nodes associated with the DB.
1040 1 : // This should synchronously close all the files
1041 1 : // associated with the DB.
1042 1 : for _, fileNum := range fileNums {
1043 1 : c.evict(fileNum, dbOpts, true)
1044 1 : }
1045 : }
1046 :
1047 1 : func (c *tableCacheShard) Close() error {
1048 1 : c.mu.Lock()
1049 1 : defer c.mu.Unlock()
1050 1 :
1051 1 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1052 1 : // the case that there are leaked iterators.
1053 1 : var err error
1054 1 : if v := c.iterCount.Load(); v > 0 {
1055 0 : if !invariants.RaceEnabled {
1056 0 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1057 0 : } else {
1058 0 : var buf bytes.Buffer
1059 0 : for _, stack := range c.mu.iters {
1060 0 : fmt.Fprintf(&buf, "%s\n", stack)
1061 0 : }
1062 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1063 : }
1064 : }
1065 :
1066 1 : for c.mu.handHot != nil {
1067 0 : n := c.mu.handHot
1068 0 : if n.value != nil {
1069 0 : if n.value.refCount.Add(-1) == 0 {
1070 0 : c.releasing.Add(1)
1071 0 : c.releasingCh <- n.value
1072 0 : }
1073 : }
1074 0 : c.unlinkNode(n)
1075 : }
1076 1 : c.mu.nodes = nil
1077 1 : c.mu.handHot = nil
1078 1 : c.mu.handCold = nil
1079 1 : c.mu.handTest = nil
1080 1 :
1081 1 : // Only shutdown the releasing goroutine if there were no leaked
1082 1 : // iterators. If there were leaked iterators, we leave the goroutine running
1083 1 : // and the releasingCh open so that a subsequent iterator close can
1084 1 : // complete. This behavior is used by iterator leak tests. Leaking the
1085 1 : // goroutine for these tests is less bad not closing the iterator which
1086 1 : // triggers other warnings about block cache handles not being released.
1087 1 : if err != nil {
1088 0 : c.releasing.Wait()
1089 0 : return err
1090 0 : }
1091 :
1092 1 : close(c.releasingCh)
1093 1 : c.releasing.Wait()
1094 1 : c.releaseLoopExit.Wait()
1095 1 : return err
1096 : }
1097 :
1098 : type tableCacheValue struct {
1099 : closeHook func(i sstable.Iterator) error
1100 : reader *sstable.Reader
1101 : err error
1102 : isShared bool
1103 : loaded chan struct{}
1104 : // Reference count for the value. The reader is closed when the reference
1105 : // count drops to zero.
1106 : refCount atomic.Int32
1107 : }
1108 :
1109 : // loadInfo contains the information needed to populate a new cache entry.
1110 : type loadInfo struct {
1111 : backingFileNum base.DiskFileNum
1112 : }
1113 :
1114 1 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
1115 1 : // Try opening the file first.
1116 1 : var f objstorage.Readable
1117 1 : var err error
1118 1 : f, err = dbOpts.objProvider.OpenForReading(
1119 1 : context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
1120 1 : )
1121 1 : if err == nil {
1122 1 : cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
1123 1 : v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
1124 1 : }
1125 1 : if err == nil {
1126 1 : var objMeta objstorage.ObjectMetadata
1127 1 : objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, loadInfo.backingFileNum)
1128 1 : v.isShared = objMeta.IsShared()
1129 1 : }
1130 1 : if err != nil {
1131 0 : v.err = errors.Wrapf(
1132 0 : err, "pebble: backing file %s error", loadInfo.backingFileNum)
1133 0 : }
1134 1 : if v.err != nil {
1135 0 : c.mu.Lock()
1136 0 : defer c.mu.Unlock()
1137 0 : // Lookup the node in the cache again as it might have already been
1138 0 : // removed.
1139 0 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
1140 0 : n := c.mu.nodes[key]
1141 0 : if n != nil && n.value == v {
1142 0 : c.releaseNode(n)
1143 0 : }
1144 : }
1145 1 : close(v.loaded)
1146 : }
1147 :
1148 1 : func (v *tableCacheValue) release(c *tableCacheShard) {
1149 1 : <-v.loaded
1150 1 : // Nothing to be done about an error at this point. Close the reader if it is
1151 1 : // open.
1152 1 : if v.reader != nil {
1153 1 : _ = v.reader.Close()
1154 1 : }
1155 1 : c.releasing.Done()
1156 : }
1157 :
1158 : type tableCacheNodeType int8
1159 :
1160 : const (
1161 : tableCacheNodeTest tableCacheNodeType = iota
1162 : tableCacheNodeCold
1163 : tableCacheNodeHot
1164 : )
1165 :
1166 0 : func (p tableCacheNodeType) String() string {
1167 0 : switch p {
1168 0 : case tableCacheNodeTest:
1169 0 : return "test"
1170 0 : case tableCacheNodeCold:
1171 0 : return "cold"
1172 0 : case tableCacheNodeHot:
1173 0 : return "hot"
1174 : }
1175 0 : return "unknown"
1176 : }
1177 :
1178 : type tableCacheNode struct {
1179 : fileNum base.DiskFileNum
1180 : value *tableCacheValue
1181 :
1182 : links struct {
1183 : next *tableCacheNode
1184 : prev *tableCacheNode
1185 : }
1186 : ptype tableCacheNodeType
1187 : // referenced is atomically set to indicate that this entry has been accessed
1188 : // since the last time one of the clock hands swept it.
1189 : referenced atomic.Bool
1190 :
1191 : // Storing the cache id associated with the DB instance here
1192 : // avoids the need to thread the dbOpts struct through many functions.
1193 : cacheID uint64
1194 : }
1195 :
1196 1 : func (n *tableCacheNode) next() *tableCacheNode {
1197 1 : if n == nil {
1198 0 : return nil
1199 0 : }
1200 1 : return n.links.next
1201 : }
1202 :
1203 1 : func (n *tableCacheNode) prev() *tableCacheNode {
1204 1 : if n == nil {
1205 0 : return nil
1206 0 : }
1207 1 : return n.links.prev
1208 : }
1209 :
1210 1 : func (n *tableCacheNode) link(s *tableCacheNode) {
1211 1 : s.links.prev = n.links.prev
1212 1 : s.links.prev.links.next = s
1213 1 : s.links.next = n
1214 1 : s.links.next.links.prev = s
1215 1 : }
1216 :
1217 1 : func (n *tableCacheNode) unlink() *tableCacheNode {
1218 1 : next := n.links.next
1219 1 : n.links.prev.links.next = n.links.next
1220 1 : n.links.next.links.prev = n.links.prev
1221 1 : n.links.prev = n
1222 1 : n.links.next = n
1223 1 : return next
1224 1 : }
1225 :
1226 : // iterSet holds a set of iterators of various key kinds, all constructed over
1227 : // the same data structure (eg, an sstable). A subset of the fields may be
1228 : // populated depending on the `iterKinds` passed to newIters.
1229 : type iterSet struct {
1230 : point internalIterator
1231 : rangeDeletion keyspan.FragmentIterator
1232 : rangeKey keyspan.FragmentIterator
1233 : }
1234 :
1235 : // TODO(jackson): Consider adding methods for fast paths that check whether an
1236 : // iterator of a particular kind is nil, so that these call sites don't need to
1237 : // reach into the struct's fields directly.
1238 :
1239 : // Point returns the contained point iterator. If there is no point iterator,
1240 : // Point returns a non-nil empty point iterator.
1241 1 : func (s *iterSet) Point() internalIterator {
1242 1 : if s.point == nil {
1243 1 : return emptyIter
1244 1 : }
1245 1 : return s.point
1246 : }
1247 :
1248 : // RangeDeletion returns the contained range deletion iterator. If there is no
1249 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
1250 : // iterator.
1251 1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
1252 1 : if s.rangeDeletion == nil {
1253 1 : return emptyKeyspanIter
1254 1 : }
1255 1 : return s.rangeDeletion
1256 : }
1257 :
1258 : // RangeKey returns the contained range key iterator. If there is no range key
1259 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
1260 1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
1261 1 : if s.rangeKey == nil {
1262 0 : return emptyKeyspanIter
1263 0 : }
1264 1 : return s.rangeKey
1265 : }
1266 :
1267 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
1268 : // must be not be called on the constituent iterators.
1269 1 : func (s *iterSet) CloseAll() error {
1270 1 : var err error
1271 1 : if s.point != nil {
1272 1 : err = s.point.Close()
1273 1 : }
1274 1 : if s.rangeDeletion != nil {
1275 1 : err = firstError(err, s.rangeDeletion.Close())
1276 1 : }
1277 1 : if s.rangeKey != nil {
1278 1 : err = firstError(err, s.rangeKey.Close())
1279 1 : }
1280 1 : return err
1281 : }
1282 :
1283 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
1284 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
1285 : // represent a set of desired iterator kinds.
1286 : type iterKinds uint8
1287 :
1288 1 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
1289 1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
1290 1 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
1291 :
1292 : const (
1293 : iterPointKeys iterKinds = 1 << iota
1294 : iterRangeDeletions
1295 : iterRangeKeys
1296 : )
|