Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/cache"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/sstableinternal"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
28 : "github.com/cockroachdb/pebble/sstable"
29 : )
30 :
31 : var emptyIter = &errorIter{err: nil}
32 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
33 :
34 : // tableNewIters creates new iterators (point, range deletion and/or range key)
35 : // for the given file metadata. Which of the various iterator kinds the user is
36 : // requesting is specified with the iterKinds bitmap.
37 : //
38 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
39 : // populated with iterators.
40 : //
41 : // If a point iterator is requested and the operation was successful,
42 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
43 : // finished.
44 : //
45 : // If a range deletion or range key iterator is requested, the corresponding
46 : // iterator may be nil if the table does not contain any keys of the
47 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
48 : // RangeKey() convenience methods that return non-nil empty iterators that may
49 : // be used if the caller requires a non-nil iterator.
50 : //
51 : // On error, all iterators are nil.
52 : //
53 : // The only (non-test) implementation of tableNewIters is
54 : // tableCacheContainer.newIters().
55 : type tableNewIters func(
56 : ctx context.Context,
57 : file *manifest.FileMetadata,
58 : opts *IterOptions,
59 : internalOpts internalIterOpts,
60 : kinds iterKinds,
61 : ) (iterSet, error)
62 :
63 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
64 : // for the rangedel iterator returned by tableNewIters.
65 1 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
66 1 : return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
67 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
68 1 : if err != nil {
69 0 : return nil, err
70 0 : }
71 1 : return iters.RangeDeletion(), nil
72 : }
73 : }
74 :
75 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
76 : // for the range key iterator returned by tableNewIters.
77 1 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
78 1 : return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
79 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
80 1 : if err != nil {
81 1 : return nil, err
82 1 : }
83 1 : return iters.RangeKey(), nil
84 : }
85 : }
86 :
87 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
88 :
89 : // tableCacheOpts contains the db specific fields
90 : // of a table cache. This is stored in the tableCacheContainer
91 : // along with the table cache.
92 : // NB: It is important to make sure that the fields in this
93 : // struct are read-only. Since the fields here are shared
94 : // by every single tableCacheShard, if non read-only fields
95 : // are updated, we could have unnecessary evictions of those
96 : // fields, and the surrounding fields from the CPU caches.
97 : type tableCacheOpts struct {
98 : // iterCount keeps track of how many iterators are open. It is used to keep
99 : // track of leaked iterators on a per-db level.
100 : iterCount *atomic.Int32
101 :
102 : loggerAndTracer LoggerAndTracer
103 : cache *cache.Cache
104 : cacheID uint64
105 : objProvider objstorage.Provider
106 : readerOpts sstable.ReaderOptions
107 : sstStatsCollector *sstable.CategoryStatsCollector
108 : }
109 :
110 : // tableCacheContainer contains the table cache and
111 : // fields which are unique to the DB.
112 : type tableCacheContainer struct {
113 : tableCache *TableCache
114 :
115 : // dbOpts contains fields relevant to the table cache
116 : // which are unique to each DB.
117 : dbOpts tableCacheOpts
118 : }
119 :
120 : // newTableCacheContainer will panic if the underlying cache in the table cache
121 : // doesn't match Options.Cache.
122 : func newTableCacheContainer(
123 : tc *TableCache,
124 : cacheID uint64,
125 : objProvider objstorage.Provider,
126 : opts *Options,
127 : size int,
128 : sstStatsCollector *sstable.CategoryStatsCollector,
129 1 : ) *tableCacheContainer {
130 1 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
131 1 : if tc != nil {
132 1 : if tc.cache != opts.Cache {
133 0 : panic("pebble: underlying cache for the table cache and db are different")
134 : }
135 1 : tc.Ref()
136 1 : } else {
137 1 : // NewTableCache should create a ref to tc which the container should
138 1 : // drop whenever it is closed.
139 1 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
140 1 : }
141 :
142 1 : t := &tableCacheContainer{}
143 1 : t.tableCache = tc
144 1 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
145 1 : t.dbOpts.cache = opts.Cache
146 1 : t.dbOpts.cacheID = cacheID
147 1 : t.dbOpts.objProvider = objProvider
148 1 : t.dbOpts.readerOpts = opts.MakeReaderOptions()
149 1 : t.dbOpts.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
150 1 : t.dbOpts.iterCount = new(atomic.Int32)
151 1 : t.dbOpts.sstStatsCollector = sstStatsCollector
152 1 : return t
153 : }
154 :
155 : // Before calling close, make sure that there will be no further need
156 : // to access any of the files associated with the store.
157 1 : func (c *tableCacheContainer) close() error {
158 1 : // We want to do some cleanup work here. Check for leaked iterators
159 1 : // by the DB using this container. Note that we'll still perform cleanup
160 1 : // below in the case that there are leaked iterators.
161 1 : var err error
162 1 : if v := c.dbOpts.iterCount.Load(); v > 0 {
163 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
164 1 : }
165 :
166 : // Release nodes here.
167 1 : for _, shard := range c.tableCache.shards {
168 1 : if shard != nil {
169 1 : shard.removeDB(&c.dbOpts)
170 1 : }
171 : }
172 1 : return firstError(err, c.tableCache.Unref())
173 : }
174 :
175 : func (c *tableCacheContainer) newIters(
176 : ctx context.Context,
177 : file *manifest.FileMetadata,
178 : opts *IterOptions,
179 : internalOpts internalIterOpts,
180 : kinds iterKinds,
181 1 : ) (iterSet, error) {
182 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
183 1 : }
184 :
185 : // getTableProperties returns the properties associated with the backing physical
186 : // table if the input metadata belongs to a virtual sstable.
187 1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
188 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
189 1 : }
190 :
191 1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
192 1 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
193 1 : }
194 :
195 1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
196 1 : var m CacheMetrics
197 1 : for i := range c.tableCache.shards {
198 1 : s := c.tableCache.shards[i]
199 1 : s.mu.RLock()
200 1 : m.Count += int64(len(s.mu.nodes))
201 1 : s.mu.RUnlock()
202 1 : m.Hits += s.hits.Load()
203 1 : m.Misses += s.misses.Load()
204 1 : }
205 1 : m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
206 1 : f := c.dbOpts.readerOpts.FilterMetricsTracker.Load()
207 1 : return m, f
208 : }
209 :
210 : func (c *tableCacheContainer) estimateSize(
211 : meta *fileMetadata, lower, upper []byte,
212 1 : ) (size uint64, err error) {
213 1 : c.withCommonReader(meta, func(cr sstable.CommonReader) error {
214 1 : size, err = cr.EstimateDiskUsage(lower, upper)
215 1 : return err
216 1 : })
217 1 : return size, err
218 : }
219 :
220 : // createCommonReader creates a Reader for this file.
221 1 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
222 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
223 1 : var cr sstable.CommonReader = v.reader
224 1 : if file.Virtual {
225 1 : virtualReader := sstable.MakeVirtualReader(
226 1 : v.reader, file.VirtualMeta().VirtualReaderParams(v.isShared),
227 1 : )
228 1 : cr = &virtualReader
229 1 : }
230 1 : return cr
231 : }
232 :
233 : func (c *tableCacheContainer) withCommonReader(
234 : meta *fileMetadata, fn func(sstable.CommonReader) error,
235 1 : ) error {
236 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
237 1 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
238 1 : defer s.unrefValue(v)
239 1 : if v.err != nil {
240 0 : return v.err
241 0 : }
242 1 : return fn(createCommonReader(v, meta))
243 : }
244 :
245 1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
246 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
247 1 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
248 1 : defer s.unrefValue(v)
249 1 : if v.err != nil {
250 1 : return v.err
251 1 : }
252 1 : return fn(v.reader)
253 : }
254 :
255 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
256 : func (c *tableCacheContainer) withVirtualReader(
257 : meta virtualMeta, fn func(sstable.VirtualReader) error,
258 1 : ) error {
259 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
260 1 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
261 1 : defer s.unrefValue(v)
262 1 : if v.err != nil {
263 0 : return v.err
264 0 : }
265 1 : provider := c.dbOpts.objProvider
266 1 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
267 1 : if err != nil {
268 0 : return err
269 0 : }
270 1 : return fn(sstable.MakeVirtualReader(v.reader, meta.VirtualReaderParams(objMeta.IsShared())))
271 : }
272 :
273 1 : func (c *tableCacheContainer) iterCount() int64 {
274 1 : return int64(c.dbOpts.iterCount.Load())
275 1 : }
276 :
277 : // TableCache is a shareable cache for open sstables.
278 : type TableCache struct {
279 : refs atomic.Int64
280 :
281 : cache *Cache
282 : shards []*tableCacheShard
283 : }
284 :
285 : // Ref adds a reference to the table cache. Once tableCache.init returns,
286 : // the table cache only remains valid if there is at least one reference
287 : // to it.
288 1 : func (c *TableCache) Ref() {
289 1 : v := c.refs.Add(1)
290 1 : // We don't want the reference count to ever go from 0 -> 1,
291 1 : // cause a reference count of 0 implies that we've closed the cache.
292 1 : if v <= 1 {
293 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
294 : }
295 : }
296 :
297 : // Unref removes a reference to the table cache.
298 1 : func (c *TableCache) Unref() error {
299 1 : v := c.refs.Add(-1)
300 1 : switch {
301 1 : case v < 0:
302 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
303 1 : case v == 0:
304 1 : var err error
305 1 : for i := range c.shards {
306 1 : // The cache shard is not allocated yet, nothing to close
307 1 : if c.shards[i] == nil {
308 0 : continue
309 : }
310 1 : err = firstError(err, c.shards[i].Close())
311 : }
312 :
313 : // Unref the cache which we create a reference to when the tableCache
314 : // is first instantiated.
315 1 : c.cache.Unref()
316 1 : return err
317 : }
318 1 : return nil
319 : }
320 :
321 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
322 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
323 1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
324 1 : if size == 0 {
325 0 : panic("pebble: cannot create a table cache of size 0")
326 1 : } else if numShards == 0 {
327 0 : panic("pebble: cannot create a table cache with 0 shards")
328 : }
329 :
330 1 : c := &TableCache{}
331 1 : c.cache = cache
332 1 : c.cache.Ref()
333 1 :
334 1 : c.shards = make([]*tableCacheShard, numShards)
335 1 : for i := range c.shards {
336 1 : c.shards[i] = &tableCacheShard{}
337 1 : c.shards[i].init(size / len(c.shards))
338 1 : }
339 :
340 : // Hold a ref to the cache here.
341 1 : c.refs.Store(1)
342 1 :
343 1 : return c
344 : }
345 :
346 1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
347 1 : return c.shards[uint64(fileNum)%uint64(len(c.shards))]
348 1 : }
349 :
350 : type tableCacheKey struct {
351 : cacheID uint64
352 : fileNum base.DiskFileNum
353 : }
354 :
355 : type tableCacheShard struct {
356 : hits atomic.Int64
357 : misses atomic.Int64
358 : iterCount atomic.Int32
359 :
360 : size int
361 :
362 : mu struct {
363 : sync.RWMutex
364 : nodes map[tableCacheKey]*tableCacheNode
365 : // The iters map is only created and populated in race builds.
366 : iters map[io.Closer][]byte
367 :
368 : handHot *tableCacheNode
369 : handCold *tableCacheNode
370 : handTest *tableCacheNode
371 :
372 : coldTarget int
373 : sizeHot int
374 : sizeCold int
375 : sizeTest int
376 : }
377 : releasing sync.WaitGroup
378 : releasingCh chan *tableCacheValue
379 : releaseLoopExit sync.WaitGroup
380 : }
381 :
382 1 : func (c *tableCacheShard) init(size int) {
383 1 : c.size = size
384 1 :
385 1 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
386 1 : c.mu.coldTarget = size
387 1 : c.releasingCh = make(chan *tableCacheValue, 100)
388 1 : c.releaseLoopExit.Add(1)
389 1 : go c.releaseLoop()
390 1 :
391 1 : if invariants.RaceEnabled {
392 0 : c.mu.iters = make(map[io.Closer][]byte)
393 0 : }
394 : }
395 :
396 1 : func (c *tableCacheShard) releaseLoop() {
397 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
398 1 : defer c.releaseLoopExit.Done()
399 1 : for v := range c.releasingCh {
400 1 : v.release(c)
401 1 : }
402 : })
403 : }
404 :
405 : // checkAndIntersectFilters checks the specific table and block property filters
406 : // for intersection with any available table and block-level properties. Returns
407 : // true for ok if this table should be read by this iterator.
408 : func (c *tableCacheShard) checkAndIntersectFilters(
409 : v *tableCacheValue,
410 : blockPropertyFilters []BlockPropertyFilter,
411 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
412 : syntheticSuffix sstable.SyntheticSuffix,
413 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
414 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
415 1 : filterer, err = sstable.IntersectsTable(
416 1 : blockPropertyFilters,
417 1 : boundLimitedFilter,
418 1 : v.reader.Properties.UserProperties,
419 1 : syntheticSuffix,
420 1 : )
421 1 : // NB: IntersectsTable will return a nil filterer if the table-level
422 1 : // properties indicate there's no intersection with the provided filters.
423 1 : if filterer == nil || err != nil {
424 1 : return false, nil, err
425 1 : }
426 : }
427 1 : return true, filterer, nil
428 : }
429 :
430 : func (c *tableCacheShard) newIters(
431 : ctx context.Context,
432 : file *manifest.FileMetadata,
433 : opts *IterOptions,
434 : internalOpts internalIterOpts,
435 : dbOpts *tableCacheOpts,
436 : kinds iterKinds,
437 1 : ) (iterSet, error) {
438 1 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
439 1 : // since parts of the sstable are read during the construction. The Reader
440 1 : // should not remember that context since the Reader can be long-lived.
441 1 :
442 1 : // Calling findNode gives us the responsibility of decrementing v's
443 1 : // refCount. If opening the underlying table resulted in error, then we
444 1 : // decrement this straight away. Otherwise, we pass that responsibility to
445 1 : // the sstable iterator, which decrements when it is closed.
446 1 : v := c.findNode(ctx, file.FileBacking, dbOpts)
447 1 : if v.err != nil {
448 1 : defer c.unrefValue(v)
449 1 : return iterSet{}, v.err
450 1 : }
451 :
452 : // Note: This suffers an allocation for virtual sstables.
453 1 : cr := createCommonReader(v, file)
454 1 : var iters iterSet
455 1 : var err error
456 1 : if kinds.RangeKey() && file.HasRangeKeys {
457 1 : iters.rangeKey, err = c.newRangeKeyIter(ctx, v, file, cr, opts.SpanIterOptions())
458 1 : }
459 1 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
460 1 : iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
461 1 : }
462 1 : if kinds.Point() && err == nil {
463 1 : iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
464 1 : }
465 1 : if err != nil {
466 1 : // NB: There's a subtlety here: Because the point iterator is the last
467 1 : // iterator we attempt to create, it's not possible for:
468 1 : // err != nil && iters.point != nil
469 1 : // If it were possible, we'd need to account for it to avoid double
470 1 : // unref-ing here, once during CloseAll and once during `unrefValue`.
471 1 : iters.CloseAll()
472 1 : c.unrefValue(v)
473 1 : return iterSet{}, err
474 1 : }
475 : // Only point iterators ever require the reader stay pinned in the cache. If
476 : // we're not returning a point iterator to the caller, we need to unref v.
477 1 : if iters.point == nil {
478 1 : c.unrefValue(v)
479 1 : }
480 1 : return iters, nil
481 : }
482 :
483 : // For flushable ingests, we decide whether to use the bloom filter base on
484 : // size.
485 : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
486 :
487 : // newPointIter is an internal helper that constructs a point iterator over a
488 : // sstable. This function is for internal use only, and callers should use
489 : // newIters instead.
490 : func (c *tableCacheShard) newPointIter(
491 : ctx context.Context,
492 : v *tableCacheValue,
493 : file *manifest.FileMetadata,
494 : cr sstable.CommonReader,
495 : opts *IterOptions,
496 : internalOpts internalIterOpts,
497 : dbOpts *tableCacheOpts,
498 1 : ) (internalIterator, error) {
499 1 : var (
500 1 : hideObsoletePoints bool = false
501 1 : pointKeyFilters []BlockPropertyFilter
502 1 : filterer *sstable.BlockPropertiesFilterer
503 1 : )
504 1 : if opts != nil {
505 1 : // This code is appending (at most one filter) in-place to
506 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
507 1 : // the same iterator tree. This is acceptable since all the following
508 1 : // properties are true:
509 1 : // - The iterator tree is single threaded, so the shared backing for the
510 1 : // slice is being mutated in a single threaded manner.
511 1 : // - Each shallow copy of the slice has its own notion of length.
512 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
513 1 : // struct, which is stateless, so overwriting that struct when creating
514 1 : // one sstable iterator is harmless to other sstable iterators that are
515 1 : // relying on that struct.
516 1 : //
517 1 : // An alternative would be to have different slices for different sstable
518 1 : // iterators, but that requires more work to avoid allocations.
519 1 : //
520 1 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
521 1 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
522 1 : // apply the obsolete block property filter. We could optimize this by
523 1 : // applying the filter.
524 1 : hideObsoletePoints, pointKeyFilters =
525 1 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
526 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
527 1 :
528 1 : var ok bool
529 1 : var err error
530 1 : ok, filterer, err = c.checkAndIntersectFilters(v, pointKeyFilters,
531 1 : internalOpts.boundLimitedFilter, file.SyntheticSuffix)
532 1 : if err != nil {
533 0 : return nil, err
534 1 : } else if !ok {
535 1 : // No point keys within the table match the filters.
536 1 : return nil, nil
537 1 : }
538 : }
539 :
540 1 : var iter sstable.Iterator
541 1 : filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
542 1 : if opts != nil {
543 1 : // By default, we don't use block filters for L6 and restrict the size for
544 1 : // flushable ingests, as these blocks can be very big.
545 1 : if !opts.UseL6Filters {
546 1 : if opts.layer == manifest.Level(6) {
547 1 : filterBlockSizeLimit = sstable.NeverUseFilterBlock
548 1 : } else if opts.layer.IsFlushableIngests() {
549 1 : filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
550 1 : }
551 : }
552 1 : if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
553 1 : ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
554 1 : }
555 : }
556 1 : tableFormat, err := v.reader.TableFormat()
557 1 : if err != nil {
558 0 : return nil, err
559 0 : }
560 1 : var rp sstable.ReaderProvider
561 1 : if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
562 1 : rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
563 1 : }
564 :
565 1 : if v.isShared && file.SyntheticSeqNum() != 0 {
566 1 : if tableFormat < sstable.TableFormatPebblev4 {
567 0 : return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
568 0 : }
569 : // The table is shared and ingested.
570 1 : hideObsoletePoints = true
571 : }
572 1 : transforms := file.IterTransforms()
573 1 : transforms.HideObsoletePoints = hideObsoletePoints
574 1 : var categoryAndQoS sstable.CategoryAndQoS
575 1 : if opts != nil {
576 1 : categoryAndQoS = opts.CategoryAndQoS
577 1 : }
578 1 : if internalOpts.compaction {
579 1 : iter, err = cr.NewCompactionIter(
580 1 : transforms, categoryAndQoS, dbOpts.sstStatsCollector, rp,
581 1 : internalOpts.bufferPool)
582 1 : } else {
583 1 : iter, err = cr.NewPointIter(
584 1 : ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit,
585 1 : internalOpts.stats, categoryAndQoS, dbOpts.sstStatsCollector, rp)
586 1 : }
587 1 : if err != nil {
588 1 : return nil, err
589 1 : }
590 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
591 : // care to avoid introducing an allocation here by adding a closure.
592 1 : iter.SetCloseHook(v.closeHook)
593 1 : c.iterCount.Add(1)
594 1 : dbOpts.iterCount.Add(1)
595 1 : if invariants.RaceEnabled {
596 0 : c.mu.Lock()
597 0 : c.mu.iters[iter] = debug.Stack()
598 0 : c.mu.Unlock()
599 0 : }
600 1 : return iter, nil
601 : }
602 :
603 : // newRangeDelIter is an internal helper that constructs an iterator over a
604 : // sstable's range deletions. This function is for table-cache internal use
605 : // only, and callers should use newIters instead.
606 : func (c *tableCacheShard) newRangeDelIter(
607 : ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *tableCacheOpts,
608 1 : ) (keyspan.FragmentIterator, error) {
609 1 : // NB: range-del iterator does not maintain a reference to the table, nor
610 1 : // does it need to read from it after creation.
611 1 : rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms())
612 1 : if err != nil {
613 1 : return nil, err
614 1 : }
615 : // Assert expected bounds in tests.
616 1 : if invariants.Sometimes(50) && rangeDelIter != nil {
617 1 : cmp := base.DefaultComparer.Compare
618 1 : if dbOpts.readerOpts.Comparer != nil {
619 1 : cmp = dbOpts.readerOpts.Comparer.Compare
620 1 : }
621 1 : rangeDelIter = keyspan.AssertBounds(
622 1 : rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
623 1 : )
624 : }
625 1 : return rangeDelIter, nil
626 : }
627 :
628 : // newRangeKeyIter is an internal helper that constructs an iterator over a
629 : // sstable's range keys. This function is for table-cache internal use only, and
630 : // callers should use newIters instead.
631 : func (c *tableCacheShard) newRangeKeyIter(
632 : ctx context.Context,
633 : v *tableCacheValue,
634 : file *fileMetadata,
635 : cr sstable.CommonReader,
636 : opts keyspan.SpanIterOptions,
637 1 : ) (keyspan.FragmentIterator, error) {
638 1 : transforms := file.FragmentIterTransforms()
639 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
640 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
641 1 : // file's range key blocks may surface deleted range keys below. This is
642 1 : // done here, rather than deferring to the block-property collector in order
643 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
644 1 : if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
645 0 : ok, _, err := c.checkAndIntersectFilters(v, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix)
646 0 : if err != nil {
647 0 : return nil, err
648 0 : } else if !ok {
649 0 : return nil, nil
650 0 : }
651 : }
652 : // TODO(radu): wrap in an AssertBounds.
653 1 : return cr.NewRawRangeKeyIter(ctx, transforms)
654 : }
655 :
656 : type tableCacheShardReaderProvider struct {
657 : c *tableCacheShard
658 : file *manifest.FileMetadata
659 : dbOpts *tableCacheOpts
660 : v *tableCacheValue
661 : }
662 :
663 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
664 :
665 : // GetReader implements sstable.ReaderProvider. Note that it is not the
666 : // responsibility of tableCacheShardReaderProvider to ensure that the file
667 : // continues to exist. The ReaderProvider is used in iterators where the
668 : // top-level iterator is pinning the read state and preventing the files from
669 : // being deleted.
670 : //
671 : // The caller must call tableCacheShardReaderProvider.Close.
672 : //
673 : // Note that currently the Reader returned here is only used to read value
674 : // blocks. This reader shouldn't be used for other purposes like reading keys
675 : // outside of virtual sstable bounds.
676 : //
677 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
678 : // that the reader isn't used for other purposes.
679 1 : func (rp *tableCacheShardReaderProvider) GetReader(ctx context.Context) (*sstable.Reader, error) {
680 1 : // Calling findNode gives us the responsibility of decrementing v's
681 1 : // refCount.
682 1 : v := rp.c.findNode(ctx, rp.file.FileBacking, rp.dbOpts)
683 1 : if v.err != nil {
684 0 : defer rp.c.unrefValue(v)
685 0 : return nil, v.err
686 0 : }
687 1 : rp.v = v
688 1 : return v.reader, nil
689 : }
690 :
691 : // Close implements sstable.ReaderProvider.
692 1 : func (rp *tableCacheShardReaderProvider) Close() {
693 1 : rp.c.unrefValue(rp.v)
694 1 : rp.v = nil
695 1 : }
696 :
697 : // getTableProperties return sst table properties for target file
698 : func (c *tableCacheShard) getTableProperties(
699 : file *fileMetadata, dbOpts *tableCacheOpts,
700 1 : ) (*sstable.Properties, error) {
701 1 : // Calling findNode gives us the responsibility of decrementing v's refCount here
702 1 : v := c.findNode(context.TODO(), file.FileBacking, dbOpts)
703 1 : defer c.unrefValue(v)
704 1 :
705 1 : if v.err != nil {
706 0 : return nil, v.err
707 0 : }
708 1 : return &v.reader.Properties, nil
709 : }
710 :
711 : // releaseNode releases a node from the tableCacheShard.
712 : //
713 : // c.mu must be held when calling this.
714 1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
715 1 : c.unlinkNode(n)
716 1 : c.clearNode(n)
717 1 : }
718 :
719 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
720 : // reference in place.
721 : //
722 : // c.mu must be held when calling this.
723 1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
724 1 : key := tableCacheKey{n.cacheID, n.fileNum}
725 1 : delete(c.mu.nodes, key)
726 1 :
727 1 : switch n.ptype {
728 1 : case tableCacheNodeHot:
729 1 : c.mu.sizeHot--
730 1 : case tableCacheNodeCold:
731 1 : c.mu.sizeCold--
732 1 : case tableCacheNodeTest:
733 1 : c.mu.sizeTest--
734 : }
735 :
736 1 : if n == c.mu.handHot {
737 1 : c.mu.handHot = c.mu.handHot.prev()
738 1 : }
739 1 : if n == c.mu.handCold {
740 1 : c.mu.handCold = c.mu.handCold.prev()
741 1 : }
742 1 : if n == c.mu.handTest {
743 1 : c.mu.handTest = c.mu.handTest.prev()
744 1 : }
745 :
746 1 : if n.unlink() == n {
747 1 : // This was the last entry in the cache.
748 1 : c.mu.handHot = nil
749 1 : c.mu.handCold = nil
750 1 : c.mu.handTest = nil
751 1 : }
752 :
753 1 : n.links.prev = nil
754 1 : n.links.next = nil
755 : }
756 :
757 1 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
758 1 : if v := n.value; v != nil {
759 1 : n.value = nil
760 1 : c.unrefValue(v)
761 1 : }
762 : }
763 :
764 : // unrefValue decrements the reference count for the specified value, releasing
765 : // it if the reference count fell to 0. Note that the value has a reference if
766 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
767 : // the node has already been removed from that map.
768 1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
769 1 : if v.refCount.Add(-1) == 0 {
770 1 : c.releasing.Add(1)
771 1 : c.releasingCh <- v
772 1 : }
773 : }
774 :
775 : // findNode returns the node for the table with the given file number, creating
776 : // that node if it didn't already exist. The caller is responsible for
777 : // decrementing the returned node's refCount.
778 : func (c *tableCacheShard) findNode(
779 : ctx context.Context, b *fileBacking, dbOpts *tableCacheOpts,
780 1 : ) *tableCacheValue {
781 1 : // The backing must have a positive refcount (otherwise it could be deleted at any time).
782 1 : b.MustHaveRefs()
783 1 : // Caution! Here fileMetadata can be a physical or virtual table. Table cache
784 1 : // readers are associated with the physical backings. All virtual tables with
785 1 : // the same backing will use the same reader from the cache; so no information
786 1 : // that can differ among these virtual tables can be plumbed into loadInfo.
787 1 : info := loadInfo{
788 1 : backingFileNum: b.DiskFileNum,
789 1 : }
790 1 :
791 1 : return c.findNodeInternal(ctx, info, dbOpts)
792 1 : }
793 :
794 : func (c *tableCacheShard) findNodeInternal(
795 : ctx context.Context, loadInfo loadInfo, dbOpts *tableCacheOpts,
796 1 : ) *tableCacheValue {
797 1 : // Fast-path for a hit in the cache.
798 1 : c.mu.RLock()
799 1 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
800 1 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
801 1 : // Fast-path hit.
802 1 : //
803 1 : // The caller is responsible for decrementing the refCount.
804 1 : v := n.value
805 1 : v.refCount.Add(1)
806 1 : c.mu.RUnlock()
807 1 : n.referenced.Store(true)
808 1 : c.hits.Add(1)
809 1 : <-v.loaded
810 1 : return v
811 1 : }
812 1 : c.mu.RUnlock()
813 1 :
814 1 : c.mu.Lock()
815 1 :
816 1 : n := c.mu.nodes[key]
817 1 : switch {
818 1 : case n == nil:
819 1 : // Slow-path miss of a non-existent node.
820 1 : n = &tableCacheNode{
821 1 : fileNum: loadInfo.backingFileNum,
822 1 : ptype: tableCacheNodeCold,
823 1 : }
824 1 : c.addNode(n, dbOpts)
825 1 : c.mu.sizeCold++
826 :
827 1 : case n.value != nil:
828 1 : // Slow-path hit of a hot or cold node.
829 1 : //
830 1 : // The caller is responsible for decrementing the refCount.
831 1 : v := n.value
832 1 : v.refCount.Add(1)
833 1 : n.referenced.Store(true)
834 1 : c.hits.Add(1)
835 1 : c.mu.Unlock()
836 1 : <-v.loaded
837 1 : return v
838 :
839 1 : default:
840 1 : // Slow-path miss of a test node.
841 1 : c.unlinkNode(n)
842 1 : c.mu.coldTarget++
843 1 : if c.mu.coldTarget > c.size {
844 1 : c.mu.coldTarget = c.size
845 1 : }
846 :
847 1 : n.referenced.Store(false)
848 1 : n.ptype = tableCacheNodeHot
849 1 : c.addNode(n, dbOpts)
850 1 : c.mu.sizeHot++
851 : }
852 :
853 1 : c.misses.Add(1)
854 1 :
855 1 : v := &tableCacheValue{
856 1 : loaded: make(chan struct{}),
857 1 : }
858 1 : v.refCount.Store(2)
859 1 : // Cache the closure invoked when an iterator is closed. This avoids an
860 1 : // allocation on every call to newIters.
861 1 : v.closeHook = func(i sstable.Iterator) error {
862 1 : if invariants.RaceEnabled {
863 0 : c.mu.Lock()
864 0 : delete(c.mu.iters, i)
865 0 : c.mu.Unlock()
866 0 : }
867 1 : c.unrefValue(v)
868 1 : c.iterCount.Add(-1)
869 1 : dbOpts.iterCount.Add(-1)
870 1 : return nil
871 : }
872 1 : n.value = v
873 1 :
874 1 : c.mu.Unlock()
875 1 :
876 1 : // Note adding to the cache lists must complete before we begin loading the
877 1 : // table as a failure during load will result in the node being unlinked.
878 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
879 1 : v.load(ctx, loadInfo, c, dbOpts)
880 1 : })
881 1 : return v
882 : }
883 :
884 1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
885 1 : c.evictNodes()
886 1 : n.cacheID = dbOpts.cacheID
887 1 : key := tableCacheKey{n.cacheID, n.fileNum}
888 1 : c.mu.nodes[key] = n
889 1 :
890 1 : n.links.next = n
891 1 : n.links.prev = n
892 1 : if c.mu.handHot == nil {
893 1 : // First element.
894 1 : c.mu.handHot = n
895 1 : c.mu.handCold = n
896 1 : c.mu.handTest = n
897 1 : } else {
898 1 : c.mu.handHot.link(n)
899 1 : }
900 :
901 1 : if c.mu.handCold == c.mu.handHot {
902 1 : c.mu.handCold = c.mu.handCold.prev()
903 1 : }
904 : }
905 :
906 1 : func (c *tableCacheShard) evictNodes() {
907 1 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
908 1 : c.runHandCold()
909 1 : }
910 : }
911 :
912 1 : func (c *tableCacheShard) runHandCold() {
913 1 : n := c.mu.handCold
914 1 : if n.ptype == tableCacheNodeCold {
915 1 : if n.referenced.Load() {
916 1 : n.referenced.Store(false)
917 1 : n.ptype = tableCacheNodeHot
918 1 : c.mu.sizeCold--
919 1 : c.mu.sizeHot++
920 1 : } else {
921 1 : c.clearNode(n)
922 1 : n.ptype = tableCacheNodeTest
923 1 : c.mu.sizeCold--
924 1 : c.mu.sizeTest++
925 1 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
926 1 : c.runHandTest()
927 1 : }
928 : }
929 : }
930 :
931 1 : c.mu.handCold = c.mu.handCold.next()
932 1 :
933 1 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
934 1 : c.runHandHot()
935 1 : }
936 : }
937 :
938 1 : func (c *tableCacheShard) runHandHot() {
939 1 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
940 1 : c.runHandTest()
941 1 : if c.mu.handHot == nil {
942 0 : return
943 0 : }
944 : }
945 :
946 1 : n := c.mu.handHot
947 1 : if n.ptype == tableCacheNodeHot {
948 1 : if n.referenced.Load() {
949 1 : n.referenced.Store(false)
950 1 : } else {
951 1 : n.ptype = tableCacheNodeCold
952 1 : c.mu.sizeHot--
953 1 : c.mu.sizeCold++
954 1 : }
955 : }
956 :
957 1 : c.mu.handHot = c.mu.handHot.next()
958 : }
959 :
960 1 : func (c *tableCacheShard) runHandTest() {
961 1 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
962 1 : c.runHandCold()
963 1 : if c.mu.handTest == nil {
964 0 : return
965 0 : }
966 : }
967 :
968 1 : n := c.mu.handTest
969 1 : if n.ptype == tableCacheNodeTest {
970 1 : c.mu.coldTarget--
971 1 : if c.mu.coldTarget < 0 {
972 1 : c.mu.coldTarget = 0
973 1 : }
974 1 : c.unlinkNode(n)
975 1 : c.clearNode(n)
976 : }
977 :
978 1 : c.mu.handTest = c.mu.handTest.next()
979 : }
980 :
981 1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
982 1 : c.mu.Lock()
983 1 : key := tableCacheKey{dbOpts.cacheID, fileNum}
984 1 : n := c.mu.nodes[key]
985 1 : var v *tableCacheValue
986 1 : if n != nil {
987 1 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
988 1 : // the tableCacheNode.release() call synchronously below to ensure the
989 1 : // sstable file descriptor is closed before returning. Note that
990 1 : // tableCacheShard.releasing needs to be incremented while holding
991 1 : // tableCacheShard.mu in order to avoid a race with Close()
992 1 : c.unlinkNode(n)
993 1 : v = n.value
994 1 : if v != nil {
995 1 : if !allowLeak {
996 1 : if t := v.refCount.Add(-1); t != 0 {
997 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
998 0 : }
999 : }
1000 1 : c.releasing.Add(1)
1001 : }
1002 : }
1003 :
1004 1 : c.mu.Unlock()
1005 1 :
1006 1 : if v != nil {
1007 1 : v.release(c)
1008 1 : }
1009 :
1010 1 : dbOpts.cache.EvictFile(dbOpts.cacheID, fileNum)
1011 : }
1012 :
1013 : // removeDB evicts any nodes which have a reference to the DB
1014 : // associated with dbOpts.cacheID. Make sure that there will
1015 : // be no more accesses to the files associated with the DB.
1016 1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
1017 1 : var fileNums []base.DiskFileNum
1018 1 :
1019 1 : c.mu.RLock()
1020 1 : // Collect the fileNums which need to be cleaned.
1021 1 : var firstNode *tableCacheNode
1022 1 : node := c.mu.handHot
1023 1 : for node != firstNode {
1024 1 : if firstNode == nil {
1025 1 : firstNode = node
1026 1 : }
1027 :
1028 1 : if node.cacheID == dbOpts.cacheID {
1029 1 : fileNums = append(fileNums, node.fileNum)
1030 1 : }
1031 1 : node = node.next()
1032 : }
1033 1 : c.mu.RUnlock()
1034 1 :
1035 1 : // Evict all the nodes associated with the DB.
1036 1 : // This should synchronously close all the files
1037 1 : // associated with the DB.
1038 1 : for _, fileNum := range fileNums {
1039 1 : c.evict(fileNum, dbOpts, true)
1040 1 : }
1041 : }
1042 :
1043 1 : func (c *tableCacheShard) Close() error {
1044 1 : c.mu.Lock()
1045 1 : defer c.mu.Unlock()
1046 1 :
1047 1 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1048 1 : // the case that there are leaked iterators.
1049 1 : var err error
1050 1 : if v := c.iterCount.Load(); v > 0 {
1051 1 : if !invariants.RaceEnabled {
1052 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1053 1 : } else {
1054 0 : var buf bytes.Buffer
1055 0 : for _, stack := range c.mu.iters {
1056 0 : fmt.Fprintf(&buf, "%s\n", stack)
1057 0 : }
1058 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1059 : }
1060 : }
1061 :
1062 1 : for c.mu.handHot != nil {
1063 0 : n := c.mu.handHot
1064 0 : if n.value != nil {
1065 0 : if n.value.refCount.Add(-1) == 0 {
1066 0 : c.releasing.Add(1)
1067 0 : c.releasingCh <- n.value
1068 0 : }
1069 : }
1070 0 : c.unlinkNode(n)
1071 : }
1072 1 : c.mu.nodes = nil
1073 1 : c.mu.handHot = nil
1074 1 : c.mu.handCold = nil
1075 1 : c.mu.handTest = nil
1076 1 :
1077 1 : // Only shutdown the releasing goroutine if there were no leaked
1078 1 : // iterators. If there were leaked iterators, we leave the goroutine running
1079 1 : // and the releasingCh open so that a subsequent iterator close can
1080 1 : // complete. This behavior is used by iterator leak tests. Leaking the
1081 1 : // goroutine for these tests is less bad not closing the iterator which
1082 1 : // triggers other warnings about block cache handles not being released.
1083 1 : if err != nil {
1084 1 : c.releasing.Wait()
1085 1 : return err
1086 1 : }
1087 :
1088 1 : close(c.releasingCh)
1089 1 : c.releasing.Wait()
1090 1 : c.releaseLoopExit.Wait()
1091 1 : return err
1092 : }
1093 :
1094 : type tableCacheValue struct {
1095 : closeHook func(i sstable.Iterator) error
1096 : reader *sstable.Reader
1097 : err error
1098 : isShared bool
1099 : loaded chan struct{}
1100 : // Reference count for the value. The reader is closed when the reference
1101 : // count drops to zero.
1102 : refCount atomic.Int32
1103 : }
1104 :
1105 : // loadInfo contains the information needed to populate a new cache entry.
1106 : type loadInfo struct {
1107 : backingFileNum base.DiskFileNum
1108 : }
1109 :
1110 : func (v *tableCacheValue) load(
1111 : ctx context.Context, loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts,
1112 1 : ) {
1113 1 : // Try opening the file first.
1114 1 : var f objstorage.Readable
1115 1 : var err error
1116 1 : f, err = dbOpts.objProvider.OpenForReading(
1117 1 : ctx, fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
1118 1 : )
1119 1 : if err == nil {
1120 1 : o := dbOpts.readerOpts
1121 1 : o.SetInternalCacheOpts(sstableinternal.CacheOptions{
1122 1 : Cache: dbOpts.cache,
1123 1 : CacheID: dbOpts.cacheID,
1124 1 : FileNum: loadInfo.backingFileNum,
1125 1 : })
1126 1 : v.reader, err = sstable.NewReader(ctx, f, o)
1127 1 : }
1128 1 : if err == nil {
1129 1 : var objMeta objstorage.ObjectMetadata
1130 1 : objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, loadInfo.backingFileNum)
1131 1 : v.isShared = objMeta.IsShared()
1132 1 : }
1133 1 : if err != nil {
1134 1 : v.err = errors.Wrapf(
1135 1 : err, "pebble: backing file %s error", loadInfo.backingFileNum)
1136 1 : }
1137 1 : if v.err != nil {
1138 1 : c.mu.Lock()
1139 1 : defer c.mu.Unlock()
1140 1 : // Lookup the node in the cache again as it might have already been
1141 1 : // removed.
1142 1 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
1143 1 : n := c.mu.nodes[key]
1144 1 : if n != nil && n.value == v {
1145 1 : c.releaseNode(n)
1146 1 : }
1147 : }
1148 1 : close(v.loaded)
1149 : }
1150 :
1151 1 : func (v *tableCacheValue) release(c *tableCacheShard) {
1152 1 : <-v.loaded
1153 1 : // Nothing to be done about an error at this point. Close the reader if it is
1154 1 : // open.
1155 1 : if v.reader != nil {
1156 1 : _ = v.reader.Close()
1157 1 : }
1158 1 : c.releasing.Done()
1159 : }
1160 :
1161 : type tableCacheNodeType int8
1162 :
1163 : const (
1164 : tableCacheNodeTest tableCacheNodeType = iota
1165 : tableCacheNodeCold
1166 : tableCacheNodeHot
1167 : )
1168 :
1169 0 : func (p tableCacheNodeType) String() string {
1170 0 : switch p {
1171 0 : case tableCacheNodeTest:
1172 0 : return "test"
1173 0 : case tableCacheNodeCold:
1174 0 : return "cold"
1175 0 : case tableCacheNodeHot:
1176 0 : return "hot"
1177 : }
1178 0 : return "unknown"
1179 : }
1180 :
1181 : type tableCacheNode struct {
1182 : fileNum base.DiskFileNum
1183 : value *tableCacheValue
1184 :
1185 : links struct {
1186 : next *tableCacheNode
1187 : prev *tableCacheNode
1188 : }
1189 : ptype tableCacheNodeType
1190 : // referenced is atomically set to indicate that this entry has been accessed
1191 : // since the last time one of the clock hands swept it.
1192 : referenced atomic.Bool
1193 :
1194 : // Storing the cache id associated with the DB instance here
1195 : // avoids the need to thread the dbOpts struct through many functions.
1196 : cacheID uint64
1197 : }
1198 :
1199 1 : func (n *tableCacheNode) next() *tableCacheNode {
1200 1 : if n == nil {
1201 0 : return nil
1202 0 : }
1203 1 : return n.links.next
1204 : }
1205 :
1206 1 : func (n *tableCacheNode) prev() *tableCacheNode {
1207 1 : if n == nil {
1208 0 : return nil
1209 0 : }
1210 1 : return n.links.prev
1211 : }
1212 :
1213 1 : func (n *tableCacheNode) link(s *tableCacheNode) {
1214 1 : s.links.prev = n.links.prev
1215 1 : s.links.prev.links.next = s
1216 1 : s.links.next = n
1217 1 : s.links.next.links.prev = s
1218 1 : }
1219 :
1220 1 : func (n *tableCacheNode) unlink() *tableCacheNode {
1221 1 : next := n.links.next
1222 1 : n.links.prev.links.next = n.links.next
1223 1 : n.links.next.links.prev = n.links.prev
1224 1 : n.links.prev = n
1225 1 : n.links.next = n
1226 1 : return next
1227 1 : }
1228 :
1229 : // iterSet holds a set of iterators of various key kinds, all constructed over
1230 : // the same data structure (eg, an sstable). A subset of the fields may be
1231 : // populated depending on the `iterKinds` passed to newIters.
1232 : type iterSet struct {
1233 : point internalIterator
1234 : rangeDeletion keyspan.FragmentIterator
1235 : rangeKey keyspan.FragmentIterator
1236 : }
1237 :
1238 : // TODO(jackson): Consider adding methods for fast paths that check whether an
1239 : // iterator of a particular kind is nil, so that these call sites don't need to
1240 : // reach into the struct's fields directly.
1241 :
1242 : // Point returns the contained point iterator. If there is no point iterator,
1243 : // Point returns a non-nil empty point iterator.
1244 1 : func (s *iterSet) Point() internalIterator {
1245 1 : if s.point == nil {
1246 1 : return emptyIter
1247 1 : }
1248 1 : return s.point
1249 : }
1250 :
1251 : // RangeDeletion returns the contained range deletion iterator. If there is no
1252 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
1253 : // iterator.
1254 1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
1255 1 : if s.rangeDeletion == nil {
1256 1 : return emptyKeyspanIter
1257 1 : }
1258 1 : return s.rangeDeletion
1259 : }
1260 :
1261 : // RangeKey returns the contained range key iterator. If there is no range key
1262 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
1263 1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
1264 1 : if s.rangeKey == nil {
1265 0 : return emptyKeyspanIter
1266 0 : }
1267 1 : return s.rangeKey
1268 : }
1269 :
1270 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
1271 : // must be not be called on the constituent iterators.
1272 1 : func (s *iterSet) CloseAll() error {
1273 1 : var err error
1274 1 : if s.point != nil {
1275 1 : err = s.point.Close()
1276 1 : s.point = nil
1277 1 : }
1278 1 : if s.rangeDeletion != nil {
1279 1 : s.rangeDeletion.Close()
1280 1 : s.rangeDeletion = nil
1281 1 : }
1282 1 : if s.rangeKey != nil {
1283 1 : s.rangeKey.Close()
1284 1 : s.rangeKey = nil
1285 1 : }
1286 1 : return err
1287 : }
1288 :
1289 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
1290 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
1291 : // represent a set of desired iterator kinds.
1292 : type iterKinds uint8
1293 :
1294 1 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
1295 1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
1296 1 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
1297 :
1298 : const (
1299 : iterPointKeys iterKinds = 1 << iota
1300 : iterRangeDeletions
1301 : iterRangeKeys
1302 : )
|