Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/cache"
21 : "github.com/cockroachdb/pebble/internal/invariants"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/cockroachdb/pebble/internal/sstableinternal"
26 : "github.com/cockroachdb/pebble/objstorage"
27 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
28 : "github.com/cockroachdb/pebble/sstable"
29 : )
30 :
31 : var emptyIter = &errorIter{err: nil}
32 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
33 :
34 : // tableNewIters creates new iterators (point, range deletion and/or range key)
35 : // for the given file metadata. Which of the various iterator kinds the user is
36 : // requesting is specified with the iterKinds bitmap.
37 : //
38 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
39 : // populated with iterators.
40 : //
41 : // If a point iterator is requested and the operation was successful,
42 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
43 : // finished.
44 : //
45 : // If a range deletion or range key iterator is requested, the corresponding
46 : // iterator may be nil if the table does not contain any keys of the
47 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
48 : // RangeKey() convenience methods that return non-nil empty iterators that may
49 : // be used if the caller requires a non-nil iterator.
50 : //
51 : // On error, all iterators are nil.
52 : //
53 : // The only (non-test) implementation of tableNewIters is
54 : // tableCacheContainer.newIters().
55 : type tableNewIters func(
56 : ctx context.Context,
57 : file *manifest.FileMetadata,
58 : opts *IterOptions,
59 : internalOpts internalIterOpts,
60 : kinds iterKinds,
61 : ) (iterSet, error)
62 :
63 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
64 : // for the rangedel iterator returned by tableNewIters.
65 1 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
66 1 : return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
67 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
68 1 : if err != nil {
69 0 : return nil, err
70 0 : }
71 1 : return iters.RangeDeletion(), nil
72 : }
73 : }
74 :
75 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
76 : // for the range key iterator returned by tableNewIters.
77 1 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
78 1 : return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
79 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
80 1 : if err != nil {
81 1 : return nil, err
82 1 : }
83 1 : return iters.RangeKey(), nil
84 : }
85 : }
86 :
87 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
88 :
89 : // tableCacheOpts contains the db specific fields
90 : // of a table cache. This is stored in the tableCacheContainer
91 : // along with the table cache.
92 : // NB: It is important to make sure that the fields in this
93 : // struct are read-only. Since the fields here are shared
94 : // by every single tableCacheShard, if non read-only fields
95 : // are updated, we could have unnecessary evictions of those
96 : // fields, and the surrounding fields from the CPU caches.
97 : type tableCacheOpts struct {
98 : // iterCount keeps track of how many iterators are open. It is used to keep
99 : // track of leaked iterators on a per-db level.
100 : iterCount *atomic.Int32
101 :
102 : loggerAndTracer LoggerAndTracer
103 : cache *cache.Cache
104 : cacheID cache.ID
105 : objProvider objstorage.Provider
106 : readerOpts sstable.ReaderOptions
107 : sstStatsCollector *sstable.CategoryStatsCollector
108 : }
109 :
110 : // tableCacheContainer contains the table cache and
111 : // fields which are unique to the DB.
112 : type tableCacheContainer struct {
113 : tableCache *TableCache
114 :
115 : // dbOpts contains fields relevant to the table cache
116 : // which are unique to each DB.
117 : dbOpts tableCacheOpts
118 : }
119 :
120 : // newTableCacheContainer will panic if the underlying cache in the table cache
121 : // doesn't match Options.Cache.
122 : func newTableCacheContainer(
123 : tc *TableCache,
124 : cacheID cache.ID,
125 : objProvider objstorage.Provider,
126 : opts *Options,
127 : size int,
128 : sstStatsCollector *sstable.CategoryStatsCollector,
129 1 : ) *tableCacheContainer {
130 1 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
131 1 : if tc != nil {
132 1 : if tc.cache != opts.Cache {
133 0 : panic("pebble: underlying cache for the table cache and db are different")
134 : }
135 1 : tc.Ref()
136 1 : } else {
137 1 : // NewTableCache should create a ref to tc which the container should
138 1 : // drop whenever it is closed.
139 1 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
140 1 : }
141 :
142 1 : t := &tableCacheContainer{}
143 1 : t.tableCache = tc
144 1 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
145 1 : t.dbOpts.cache = opts.Cache
146 1 : t.dbOpts.cacheID = cacheID
147 1 : t.dbOpts.objProvider = objProvider
148 1 : t.dbOpts.readerOpts = opts.MakeReaderOptions()
149 1 : t.dbOpts.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
150 1 : t.dbOpts.iterCount = new(atomic.Int32)
151 1 : t.dbOpts.sstStatsCollector = sstStatsCollector
152 1 : return t
153 : }
154 :
155 : // Before calling close, make sure that there will be no further need
156 : // to access any of the files associated with the store.
157 1 : func (c *tableCacheContainer) close() error {
158 1 : // We want to do some cleanup work here. Check for leaked iterators
159 1 : // by the DB using this container. Note that we'll still perform cleanup
160 1 : // below in the case that there are leaked iterators.
161 1 : var err error
162 1 : if v := c.dbOpts.iterCount.Load(); v > 0 {
163 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
164 1 : }
165 :
166 : // Release nodes here.
167 1 : for _, shard := range c.tableCache.shards {
168 1 : if shard != nil {
169 1 : shard.removeDB(&c.dbOpts)
170 1 : }
171 : }
172 1 : return firstError(err, c.tableCache.Unref())
173 : }
174 :
175 : func (c *tableCacheContainer) newIters(
176 : ctx context.Context,
177 : file *manifest.FileMetadata,
178 : opts *IterOptions,
179 : internalOpts internalIterOpts,
180 : kinds iterKinds,
181 1 : ) (iterSet, error) {
182 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
183 1 : }
184 :
185 : // getTableProperties returns the properties associated with the backing physical
186 : // table if the input metadata belongs to a virtual sstable.
187 1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
188 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
189 1 : }
190 :
191 1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
192 1 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
193 1 : }
194 :
195 1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
196 1 : var m CacheMetrics
197 1 : for i := range c.tableCache.shards {
198 1 : s := c.tableCache.shards[i]
199 1 : s.mu.RLock()
200 1 : m.Count += int64(len(s.mu.nodes))
201 1 : s.mu.RUnlock()
202 1 : m.Hits += s.hits.Load()
203 1 : m.Misses += s.misses.Load()
204 1 : }
205 1 : m.Size = m.Count * int64(unsafe.Sizeof(tableCacheNode{})+unsafe.Sizeof(tableCacheValue{})+unsafe.Sizeof(sstable.Reader{}))
206 1 : f := c.dbOpts.readerOpts.FilterMetricsTracker.Load()
207 1 : return m, f
208 : }
209 :
210 : func (c *tableCacheContainer) estimateSize(
211 : meta *fileMetadata, lower, upper []byte,
212 1 : ) (size uint64, err error) {
213 1 : c.withCommonReader(meta, func(cr sstable.CommonReader) error {
214 1 : size, err = cr.EstimateDiskUsage(lower, upper)
215 1 : return err
216 1 : })
217 1 : return size, err
218 : }
219 :
220 : // createCommonReader creates a Reader for this file.
221 1 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
222 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
223 1 : var cr sstable.CommonReader = v.reader
224 1 : if file.Virtual {
225 1 : virtualReader := sstable.MakeVirtualReader(
226 1 : v.reader, file.VirtualMeta().VirtualReaderParams(v.isShared),
227 1 : )
228 1 : cr = &virtualReader
229 1 : }
230 1 : return cr
231 : }
232 :
233 : func (c *tableCacheContainer) withCommonReader(
234 : meta *fileMetadata, fn func(sstable.CommonReader) error,
235 1 : ) error {
236 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
237 1 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
238 1 : defer s.unrefValue(v)
239 1 : if v.err != nil {
240 0 : return v.err
241 0 : }
242 1 : return fn(createCommonReader(v, meta))
243 : }
244 :
245 1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
246 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
247 1 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
248 1 : defer s.unrefValue(v)
249 1 : if v.err != nil {
250 1 : return v.err
251 1 : }
252 1 : return fn(v.reader)
253 : }
254 :
255 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
256 : func (c *tableCacheContainer) withVirtualReader(
257 : meta virtualMeta, fn func(sstable.VirtualReader) error,
258 1 : ) error {
259 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
260 1 : v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
261 1 : defer s.unrefValue(v)
262 1 : if v.err != nil {
263 0 : return v.err
264 0 : }
265 1 : provider := c.dbOpts.objProvider
266 1 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
267 1 : if err != nil {
268 0 : return err
269 0 : }
270 1 : return fn(sstable.MakeVirtualReader(v.reader, meta.VirtualReaderParams(objMeta.IsShared())))
271 : }
272 :
273 1 : func (c *tableCacheContainer) iterCount() int64 {
274 1 : return int64(c.dbOpts.iterCount.Load())
275 1 : }
276 :
277 : // TableCache is a shareable cache for open sstables.
278 : type TableCache struct {
279 : refs atomic.Int64
280 :
281 : cache *Cache
282 : shards []*tableCacheShard
283 : }
284 :
285 : // Ref adds a reference to the table cache. Once tableCache.init returns,
286 : // the table cache only remains valid if there is at least one reference
287 : // to it.
288 1 : func (c *TableCache) Ref() {
289 1 : v := c.refs.Add(1)
290 1 : // We don't want the reference count to ever go from 0 -> 1,
291 1 : // cause a reference count of 0 implies that we've closed the cache.
292 1 : if v <= 1 {
293 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
294 : }
295 : }
296 :
297 : // Unref removes a reference to the table cache.
298 1 : func (c *TableCache) Unref() error {
299 1 : v := c.refs.Add(-1)
300 1 : switch {
301 1 : case v < 0:
302 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
303 1 : case v == 0:
304 1 : var err error
305 1 : for i := range c.shards {
306 1 : // The cache shard is not allocated yet, nothing to close
307 1 : if c.shards[i] == nil {
308 0 : continue
309 : }
310 1 : err = firstError(err, c.shards[i].Close())
311 : }
312 :
313 : // Unref the cache which we create a reference to when the tableCache
314 : // is first instantiated.
315 1 : c.cache.Unref()
316 1 : return err
317 : }
318 1 : return nil
319 : }
320 :
321 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
322 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
323 1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
324 1 : if size == 0 {
325 0 : panic("pebble: cannot create a table cache of size 0")
326 1 : } else if numShards == 0 {
327 0 : panic("pebble: cannot create a table cache with 0 shards")
328 : }
329 :
330 1 : c := &TableCache{}
331 1 : c.cache = cache
332 1 : c.cache.Ref()
333 1 :
334 1 : c.shards = make([]*tableCacheShard, numShards)
335 1 : for i := range c.shards {
336 1 : c.shards[i] = &tableCacheShard{}
337 1 : c.shards[i].init(size / len(c.shards))
338 1 : }
339 :
340 : // Hold a ref to the cache here.
341 1 : c.refs.Store(1)
342 1 :
343 1 : return c
344 : }
345 :
346 1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
347 1 : return c.shards[uint64(fileNum)%uint64(len(c.shards))]
348 1 : }
349 :
350 : type tableCacheKey struct {
351 : cacheID cache.ID
352 : fileNum base.DiskFileNum
353 : }
354 :
355 : type tableCacheShard struct {
356 : hits atomic.Int64
357 : misses atomic.Int64
358 : iterCount atomic.Int32
359 :
360 : size int
361 :
362 : mu struct {
363 : sync.RWMutex
364 : nodes map[tableCacheKey]*tableCacheNode
365 : // The iters map is only created and populated in race builds.
366 : iters map[io.Closer][]byte
367 :
368 : handHot *tableCacheNode
369 : handCold *tableCacheNode
370 : handTest *tableCacheNode
371 :
372 : coldTarget int
373 : sizeHot int
374 : sizeCold int
375 : sizeTest int
376 : }
377 : releasing sync.WaitGroup
378 : releasingCh chan *tableCacheValue
379 : releaseLoopExit sync.WaitGroup
380 : }
381 :
382 1 : func (c *tableCacheShard) init(size int) {
383 1 : c.size = size
384 1 :
385 1 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
386 1 : c.mu.coldTarget = size
387 1 : c.releasingCh = make(chan *tableCacheValue, 100)
388 1 : c.releaseLoopExit.Add(1)
389 1 : go c.releaseLoop()
390 1 :
391 1 : if invariants.RaceEnabled {
392 0 : c.mu.iters = make(map[io.Closer][]byte)
393 0 : }
394 : }
395 :
396 1 : func (c *tableCacheShard) releaseLoop() {
397 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
398 1 : defer c.releaseLoopExit.Done()
399 1 : for v := range c.releasingCh {
400 1 : v.release(c)
401 1 : }
402 : })
403 : }
404 :
405 : // checkAndIntersectFilters checks the specific table and block property filters
406 : // for intersection with any available table and block-level properties. Returns
407 : // true for ok if this table should be read by this iterator.
408 : func (c *tableCacheShard) checkAndIntersectFilters(
409 : v *tableCacheValue,
410 : blockPropertyFilters []BlockPropertyFilter,
411 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
412 : syntheticSuffix sstable.SyntheticSuffix,
413 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
414 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
415 1 : filterer, err = sstable.IntersectsTable(
416 1 : blockPropertyFilters,
417 1 : boundLimitedFilter,
418 1 : v.reader.Properties.UserProperties,
419 1 : syntheticSuffix,
420 1 : )
421 1 : // NB: IntersectsTable will return a nil filterer if the table-level
422 1 : // properties indicate there's no intersection with the provided filters.
423 1 : if filterer == nil || err != nil {
424 1 : return false, nil, err
425 1 : }
426 : }
427 1 : return true, filterer, nil
428 : }
429 :
430 : func (c *tableCacheShard) newIters(
431 : ctx context.Context,
432 : file *manifest.FileMetadata,
433 : opts *IterOptions,
434 : internalOpts internalIterOpts,
435 : dbOpts *tableCacheOpts,
436 : kinds iterKinds,
437 1 : ) (iterSet, error) {
438 1 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
439 1 : // since parts of the sstable are read during the construction. The Reader
440 1 : // should not remember that context since the Reader can be long-lived.
441 1 :
442 1 : // Calling findNode gives us the responsibility of decrementing v's
443 1 : // refCount. If opening the underlying table resulted in error, then we
444 1 : // decrement this straight away. Otherwise, we pass that responsibility to
445 1 : // the sstable iterator, which decrements when it is closed.
446 1 : v := c.findNode(ctx, file.FileBacking, dbOpts)
447 1 : if v.err != nil {
448 1 : defer c.unrefValue(v)
449 1 : return iterSet{}, v.err
450 1 : }
451 :
452 : // Note: This suffers an allocation for virtual sstables.
453 1 : cr := createCommonReader(v, file)
454 1 : var iters iterSet
455 1 : var err error
456 1 : if kinds.RangeKey() && file.HasRangeKeys {
457 1 : iters.rangeKey, err = c.newRangeKeyIter(ctx, v, file, cr, opts.SpanIterOptions())
458 1 : }
459 1 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
460 1 : iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
461 1 : }
462 1 : if kinds.Point() && err == nil {
463 1 : iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
464 1 : }
465 1 : if err != nil {
466 1 : // NB: There's a subtlety here: Because the point iterator is the last
467 1 : // iterator we attempt to create, it's not possible for:
468 1 : // err != nil && iters.point != nil
469 1 : // If it were possible, we'd need to account for it to avoid double
470 1 : // unref-ing here, once during CloseAll and once during `unrefValue`.
471 1 : iters.CloseAll()
472 1 : c.unrefValue(v)
473 1 : return iterSet{}, err
474 1 : }
475 : // Only point iterators ever require the reader stay pinned in the cache. If
476 : // we're not returning a point iterator to the caller, we need to unref v.
477 1 : if iters.point == nil {
478 1 : c.unrefValue(v)
479 1 : }
480 1 : return iters, nil
481 : }
482 :
483 : // For flushable ingests, we decide whether to use the bloom filter base on
484 : // size.
485 : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
486 :
487 : // newPointIter is an internal helper that constructs a point iterator over a
488 : // sstable. This function is for internal use only, and callers should use
489 : // newIters instead.
490 : func (c *tableCacheShard) newPointIter(
491 : ctx context.Context,
492 : v *tableCacheValue,
493 : file *manifest.FileMetadata,
494 : cr sstable.CommonReader,
495 : opts *IterOptions,
496 : internalOpts internalIterOpts,
497 : dbOpts *tableCacheOpts,
498 1 : ) (internalIterator, error) {
499 1 : var (
500 1 : hideObsoletePoints bool = false
501 1 : pointKeyFilters []BlockPropertyFilter
502 1 : filterer *sstable.BlockPropertiesFilterer
503 1 : )
504 1 : if opts != nil {
505 1 : // This code is appending (at most one filter) in-place to
506 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
507 1 : // the same iterator tree. This is acceptable since all the following
508 1 : // properties are true:
509 1 : // - The iterator tree is single threaded, so the shared backing for the
510 1 : // slice is being mutated in a single threaded manner.
511 1 : // - Each shallow copy of the slice has its own notion of length.
512 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
513 1 : // struct, which is stateless, so overwriting that struct when creating
514 1 : // one sstable iterator is harmless to other sstable iterators that are
515 1 : // relying on that struct.
516 1 : //
517 1 : // An alternative would be to have different slices for different sstable
518 1 : // iterators, but that requires more work to avoid allocations.
519 1 : //
520 1 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
521 1 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
522 1 : // apply the obsolete block property filter. We could optimize this by
523 1 : // applying the filter.
524 1 : hideObsoletePoints, pointKeyFilters =
525 1 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
526 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
527 1 :
528 1 : var ok bool
529 1 : var err error
530 1 : ok, filterer, err = c.checkAndIntersectFilters(v, pointKeyFilters,
531 1 : internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
532 1 : if err != nil {
533 0 : return nil, err
534 1 : } else if !ok {
535 1 : // No point keys within the table match the filters.
536 1 : return nil, nil
537 1 : }
538 : }
539 :
540 1 : var iter sstable.Iterator
541 1 : filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
542 1 : if opts != nil {
543 1 : // By default, we don't use block filters for L6 and restrict the size for
544 1 : // flushable ingests, as these blocks can be very big.
545 1 : if !opts.UseL6Filters {
546 1 : if opts.layer == manifest.Level(6) {
547 1 : filterBlockSizeLimit = sstable.NeverUseFilterBlock
548 1 : } else if opts.layer.IsFlushableIngests() {
549 1 : filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
550 1 : }
551 : }
552 1 : if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
553 1 : ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
554 1 : }
555 : }
556 1 : tableFormat, err := v.reader.TableFormat()
557 1 : if err != nil {
558 0 : return nil, err
559 0 : }
560 :
561 1 : if v.isShared && file.SyntheticSeqNum() != 0 {
562 1 : if tableFormat < sstable.TableFormatPebblev4 {
563 0 : return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
564 0 : }
565 : // The table is shared and ingested.
566 1 : hideObsoletePoints = true
567 : }
568 1 : transforms := file.IterTransforms()
569 1 : transforms.HideObsoletePoints = hideObsoletePoints
570 1 : iterStatsAccum := internalOpts.iterStatsAccumulator
571 1 : if iterStatsAccum == nil && opts != nil && dbOpts.sstStatsCollector != nil {
572 1 : iterStatsAccum = dbOpts.sstStatsCollector.Accumulator(
573 1 : uint64(uintptr(unsafe.Pointer(v.reader))), opts.Category)
574 1 : }
575 1 : if internalOpts.compaction {
576 1 : iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, &v.readerProvider, internalOpts.bufferPool)
577 1 : } else {
578 1 : iter, err = cr.NewPointIter(
579 1 : ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit,
580 1 : internalOpts.stats, iterStatsAccum, &v.readerProvider)
581 1 : }
582 1 : if err != nil {
583 1 : return nil, err
584 1 : }
585 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
586 : // care to avoid introducing an allocation here by adding a closure.
587 1 : iter.SetCloseHook(v.closeHook)
588 1 : c.iterCount.Add(1)
589 1 : dbOpts.iterCount.Add(1)
590 1 : if invariants.RaceEnabled {
591 0 : c.mu.Lock()
592 0 : c.mu.iters[iter] = debug.Stack()
593 0 : c.mu.Unlock()
594 0 : }
595 1 : return iter, nil
596 : }
597 :
598 : // newRangeDelIter is an internal helper that constructs an iterator over a
599 : // sstable's range deletions. This function is for table-cache internal use
600 : // only, and callers should use newIters instead.
601 : func (c *tableCacheShard) newRangeDelIter(
602 : ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *tableCacheOpts,
603 1 : ) (keyspan.FragmentIterator, error) {
604 1 : // NB: range-del iterator does not maintain a reference to the table, nor
605 1 : // does it need to read from it after creation.
606 1 : rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms())
607 1 : if err != nil {
608 1 : return nil, err
609 1 : }
610 : // Assert expected bounds in tests.
611 1 : if invariants.Sometimes(50) && rangeDelIter != nil {
612 1 : cmp := base.DefaultComparer.Compare
613 1 : if dbOpts.readerOpts.Comparer != nil {
614 1 : cmp = dbOpts.readerOpts.Comparer.Compare
615 1 : }
616 1 : rangeDelIter = keyspan.AssertBounds(
617 1 : rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
618 1 : )
619 : }
620 1 : return rangeDelIter, nil
621 : }
622 :
623 : // newRangeKeyIter is an internal helper that constructs an iterator over a
624 : // sstable's range keys. This function is for table-cache internal use only, and
625 : // callers should use newIters instead.
626 : func (c *tableCacheShard) newRangeKeyIter(
627 : ctx context.Context,
628 : v *tableCacheValue,
629 : file *fileMetadata,
630 : cr sstable.CommonReader,
631 : opts keyspan.SpanIterOptions,
632 1 : ) (keyspan.FragmentIterator, error) {
633 1 : transforms := file.FragmentIterTransforms()
634 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
635 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
636 1 : // file's range key blocks may surface deleted range keys below. This is
637 1 : // done here, rather than deferring to the block-property collector in order
638 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
639 1 : if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
640 0 : ok, _, err := c.checkAndIntersectFilters(v, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
641 0 : if err != nil {
642 0 : return nil, err
643 0 : } else if !ok {
644 0 : return nil, nil
645 0 : }
646 : }
647 : // TODO(radu): wrap in an AssertBounds.
648 1 : return cr.NewRawRangeKeyIter(ctx, transforms)
649 : }
650 :
651 : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
652 : // specific table.
653 : type tableCacheShardReaderProvider struct {
654 : c *tableCacheShard
655 : dbOpts *tableCacheOpts
656 : backingFileNum base.DiskFileNum
657 :
658 : mu struct {
659 : sync.Mutex
660 : // v is the result of findNode. Whenever it is not null, we hold a refcount
661 : // on the tableCacheValue.
662 : v *tableCacheValue
663 : // refCount is the number of GetReader() calls that have not received a
664 : // corresponding Close().
665 : refCount int
666 : }
667 : }
668 :
669 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
670 :
671 : func (rp *tableCacheShardReaderProvider) init(
672 : c *tableCacheShard, dbOpts *tableCacheOpts, backingFileNum base.DiskFileNum,
673 1 : ) {
674 1 : rp.c = c
675 1 : rp.dbOpts = dbOpts
676 1 : rp.backingFileNum = backingFileNum
677 1 : rp.mu.v = nil
678 1 : rp.mu.refCount = 0
679 1 : }
680 :
681 : // GetReader implements sstable.ReaderProvider. Note that it is not the
682 : // responsibility of tableCacheShardReaderProvider to ensure that the file
683 : // continues to exist. The ReaderProvider is used in iterators where the
684 : // top-level iterator is pinning the read state and preventing the files from
685 : // being deleted.
686 : //
687 : // The caller must call tableCacheShardReaderProvider.Close.
688 : //
689 : // Note that currently the Reader returned here is only used to read value
690 : // blocks. This reader shouldn't be used for other purposes like reading keys
691 : // outside of virtual sstable bounds.
692 : //
693 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
694 : // that the reader isn't used for other purposes.
695 1 : func (rp *tableCacheShardReaderProvider) GetReader(ctx context.Context) (*sstable.Reader, error) {
696 1 : rp.mu.Lock()
697 1 : defer rp.mu.Unlock()
698 1 :
699 1 : if rp.mu.v != nil {
700 0 : rp.mu.refCount++
701 0 : return rp.mu.v.reader, nil
702 0 : }
703 :
704 : // Calling findNodeInternal gives us the responsibility of decrementing v's
705 : // refCount. Note that if the table is no longer in the cache,
706 : // findNodeInternal will need to do IO to initialize a new Reader. We hold
707 : // rp.mu during this time so that concurrent GetReader calls block until the
708 : // Reader is created.
709 1 : v := rp.c.findNodeInternal(ctx, rp.backingFileNum, rp.dbOpts)
710 1 : if v.err != nil {
711 0 : defer rp.c.unrefValue(v)
712 0 : return nil, v.err
713 0 : }
714 1 : rp.mu.v = v
715 1 : rp.mu.refCount = 1
716 1 : return v.reader, nil
717 : }
718 :
719 : // Close implements sstable.ReaderProvider.
720 1 : func (rp *tableCacheShardReaderProvider) Close() {
721 1 : rp.mu.Lock()
722 1 : defer rp.mu.Unlock()
723 1 : rp.mu.refCount--
724 1 : if rp.mu.refCount <= 0 {
725 1 : if rp.mu.refCount < 0 {
726 0 : panic("pebble: sstable.ReaderProvider misuse")
727 : }
728 1 : rp.c.unrefValue(rp.mu.v)
729 1 : rp.mu.v = nil
730 : }
731 : }
732 :
733 : // getTableProperties return sst table properties for target file
734 : func (c *tableCacheShard) getTableProperties(
735 : file *fileMetadata, dbOpts *tableCacheOpts,
736 1 : ) (*sstable.Properties, error) {
737 1 : // Calling findNode gives us the responsibility of decrementing v's refCount here
738 1 : v := c.findNode(context.TODO(), file.FileBacking, dbOpts)
739 1 : defer c.unrefValue(v)
740 1 :
741 1 : if v.err != nil {
742 0 : return nil, v.err
743 0 : }
744 1 : return &v.reader.Properties, nil
745 : }
746 :
747 : // releaseNode releases a node from the tableCacheShard.
748 : //
749 : // c.mu must be held when calling this.
750 1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
751 1 : c.unlinkNode(n)
752 1 : c.clearNode(n)
753 1 : }
754 :
755 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
756 : // reference in place.
757 : //
758 : // c.mu must be held when calling this.
759 1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
760 1 : key := tableCacheKey{n.cacheID, n.fileNum}
761 1 : delete(c.mu.nodes, key)
762 1 :
763 1 : switch n.ptype {
764 1 : case tableCacheNodeHot:
765 1 : c.mu.sizeHot--
766 1 : case tableCacheNodeCold:
767 1 : c.mu.sizeCold--
768 1 : case tableCacheNodeTest:
769 1 : c.mu.sizeTest--
770 : }
771 :
772 1 : if n == c.mu.handHot {
773 1 : c.mu.handHot = c.mu.handHot.prev()
774 1 : }
775 1 : if n == c.mu.handCold {
776 1 : c.mu.handCold = c.mu.handCold.prev()
777 1 : }
778 1 : if n == c.mu.handTest {
779 1 : c.mu.handTest = c.mu.handTest.prev()
780 1 : }
781 :
782 1 : if n.unlink() == n {
783 1 : // This was the last entry in the cache.
784 1 : c.mu.handHot = nil
785 1 : c.mu.handCold = nil
786 1 : c.mu.handTest = nil
787 1 : }
788 :
789 1 : n.links.prev = nil
790 1 : n.links.next = nil
791 : }
792 :
793 1 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
794 1 : if v := n.value; v != nil {
795 1 : n.value = nil
796 1 : c.unrefValue(v)
797 1 : }
798 : }
799 :
800 : // unrefValue decrements the reference count for the specified value, releasing
801 : // it if the reference count fell to 0. Note that the value has a reference if
802 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
803 : // the node has already been removed from that map.
804 1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
805 1 : if v.refCount.Add(-1) == 0 {
806 1 : c.releasing.Add(1)
807 1 : c.releasingCh <- v
808 1 : }
809 : }
810 :
811 : // findNode returns the node for the table with the given file number, creating
812 : // that node if it didn't already exist. The caller is responsible for
813 : // decrementing the returned node's refCount.
814 : func (c *tableCacheShard) findNode(
815 : ctx context.Context, b *fileBacking, dbOpts *tableCacheOpts,
816 1 : ) *tableCacheValue {
817 1 : // The backing must have a positive refcount (otherwise it could be deleted at any time).
818 1 : b.MustHaveRefs()
819 1 : // Caution! Here fileMetadata can be a physical or virtual table. Table cache
820 1 : // readers are associated with the physical backings. All virtual tables with
821 1 : // the same backing will use the same reader from the cache; so no information
822 1 : // that can differ among these virtual tables can be passed to findNodeInternal.
823 1 : backingFileNum := b.DiskFileNum
824 1 :
825 1 : return c.findNodeInternal(ctx, backingFileNum, dbOpts)
826 1 : }
827 :
828 : func (c *tableCacheShard) findNodeInternal(
829 : ctx context.Context, backingFileNum base.DiskFileNum, dbOpts *tableCacheOpts,
830 1 : ) *tableCacheValue {
831 1 : // Fast-path for a hit in the cache.
832 1 : c.mu.RLock()
833 1 : key := tableCacheKey{dbOpts.cacheID, backingFileNum}
834 1 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
835 1 : // Fast-path hit.
836 1 : //
837 1 : // The caller is responsible for decrementing the refCount.
838 1 : v := n.value
839 1 : v.refCount.Add(1)
840 1 : c.mu.RUnlock()
841 1 : n.referenced.Store(true)
842 1 : c.hits.Add(1)
843 1 : <-v.loaded
844 1 : return v
845 1 : }
846 1 : c.mu.RUnlock()
847 1 :
848 1 : c.mu.Lock()
849 1 :
850 1 : n := c.mu.nodes[key]
851 1 : switch {
852 1 : case n == nil:
853 1 : // Slow-path miss of a non-existent node.
854 1 : n = &tableCacheNode{
855 1 : fileNum: backingFileNum,
856 1 : ptype: tableCacheNodeCold,
857 1 : }
858 1 : c.addNode(n, dbOpts)
859 1 : c.mu.sizeCold++
860 :
861 1 : case n.value != nil:
862 1 : // Slow-path hit of a hot or cold node.
863 1 : //
864 1 : // The caller is responsible for decrementing the refCount.
865 1 : v := n.value
866 1 : v.refCount.Add(1)
867 1 : n.referenced.Store(true)
868 1 : c.hits.Add(1)
869 1 : c.mu.Unlock()
870 1 : <-v.loaded
871 1 : return v
872 :
873 1 : default:
874 1 : // Slow-path miss of a test node.
875 1 : c.unlinkNode(n)
876 1 : c.mu.coldTarget++
877 1 : if c.mu.coldTarget > c.size {
878 1 : c.mu.coldTarget = c.size
879 1 : }
880 :
881 1 : n.referenced.Store(false)
882 1 : n.ptype = tableCacheNodeHot
883 1 : c.addNode(n, dbOpts)
884 1 : c.mu.sizeHot++
885 : }
886 :
887 1 : c.misses.Add(1)
888 1 :
889 1 : v := &tableCacheValue{
890 1 : loaded: make(chan struct{}),
891 1 : }
892 1 : v.readerProvider.init(c, dbOpts, backingFileNum)
893 1 : v.refCount.Store(2)
894 1 : // Cache the closure invoked when an iterator is closed. This avoids an
895 1 : // allocation on every call to newIters.
896 1 : v.closeHook = func(i sstable.Iterator) error {
897 1 : if invariants.RaceEnabled {
898 0 : c.mu.Lock()
899 0 : delete(c.mu.iters, i)
900 0 : c.mu.Unlock()
901 0 : }
902 1 : c.unrefValue(v)
903 1 : c.iterCount.Add(-1)
904 1 : dbOpts.iterCount.Add(-1)
905 1 : return nil
906 : }
907 1 : n.value = v
908 1 :
909 1 : c.mu.Unlock()
910 1 :
911 1 : // Note adding to the cache lists must complete before we begin loading the
912 1 : // table as a failure during load will result in the node being unlinked.
913 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
914 1 : v.load(ctx, backingFileNum, c, dbOpts)
915 1 : })
916 1 : return v
917 : }
918 :
919 1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
920 1 : c.evictNodes()
921 1 : n.cacheID = dbOpts.cacheID
922 1 : key := tableCacheKey{n.cacheID, n.fileNum}
923 1 : c.mu.nodes[key] = n
924 1 :
925 1 : n.links.next = n
926 1 : n.links.prev = n
927 1 : if c.mu.handHot == nil {
928 1 : // First element.
929 1 : c.mu.handHot = n
930 1 : c.mu.handCold = n
931 1 : c.mu.handTest = n
932 1 : } else {
933 1 : c.mu.handHot.link(n)
934 1 : }
935 :
936 1 : if c.mu.handCold == c.mu.handHot {
937 1 : c.mu.handCold = c.mu.handCold.prev()
938 1 : }
939 : }
940 :
941 1 : func (c *tableCacheShard) evictNodes() {
942 1 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
943 1 : c.runHandCold()
944 1 : }
945 : }
946 :
947 1 : func (c *tableCacheShard) runHandCold() {
948 1 : n := c.mu.handCold
949 1 : if n.ptype == tableCacheNodeCold {
950 1 : if n.referenced.Load() {
951 1 : n.referenced.Store(false)
952 1 : n.ptype = tableCacheNodeHot
953 1 : c.mu.sizeCold--
954 1 : c.mu.sizeHot++
955 1 : } else {
956 1 : c.clearNode(n)
957 1 : n.ptype = tableCacheNodeTest
958 1 : c.mu.sizeCold--
959 1 : c.mu.sizeTest++
960 1 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
961 1 : c.runHandTest()
962 1 : }
963 : }
964 : }
965 :
966 1 : c.mu.handCold = c.mu.handCold.next()
967 1 :
968 1 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
969 1 : c.runHandHot()
970 1 : }
971 : }
972 :
973 1 : func (c *tableCacheShard) runHandHot() {
974 1 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
975 1 : c.runHandTest()
976 1 : if c.mu.handHot == nil {
977 0 : return
978 0 : }
979 : }
980 :
981 1 : n := c.mu.handHot
982 1 : if n.ptype == tableCacheNodeHot {
983 1 : if n.referenced.Load() {
984 1 : n.referenced.Store(false)
985 1 : } else {
986 1 : n.ptype = tableCacheNodeCold
987 1 : c.mu.sizeHot--
988 1 : c.mu.sizeCold++
989 1 : }
990 : }
991 :
992 1 : c.mu.handHot = c.mu.handHot.next()
993 : }
994 :
995 1 : func (c *tableCacheShard) runHandTest() {
996 1 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
997 1 : c.runHandCold()
998 1 : if c.mu.handTest == nil {
999 0 : return
1000 0 : }
1001 : }
1002 :
1003 1 : n := c.mu.handTest
1004 1 : if n.ptype == tableCacheNodeTest {
1005 1 : c.mu.coldTarget--
1006 1 : if c.mu.coldTarget < 0 {
1007 1 : c.mu.coldTarget = 0
1008 1 : }
1009 1 : c.unlinkNode(n)
1010 1 : c.clearNode(n)
1011 : }
1012 :
1013 1 : c.mu.handTest = c.mu.handTest.next()
1014 : }
1015 :
1016 1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
1017 1 : c.mu.Lock()
1018 1 : key := tableCacheKey{dbOpts.cacheID, fileNum}
1019 1 : n := c.mu.nodes[key]
1020 1 : var v *tableCacheValue
1021 1 : if n != nil {
1022 1 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
1023 1 : // the tableCacheNode.release() call synchronously below to ensure the
1024 1 : // sstable file descriptor is closed before returning. Note that
1025 1 : // tableCacheShard.releasing needs to be incremented while holding
1026 1 : // tableCacheShard.mu in order to avoid a race with Close()
1027 1 : c.unlinkNode(n)
1028 1 : v = n.value
1029 1 : if v != nil {
1030 1 : if !allowLeak {
1031 1 : if t := v.refCount.Add(-1); t != 0 {
1032 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
1033 0 : }
1034 : }
1035 1 : c.releasing.Add(1)
1036 : }
1037 : }
1038 :
1039 1 : c.mu.Unlock()
1040 1 :
1041 1 : if v != nil {
1042 1 : v.release(c)
1043 1 : }
1044 :
1045 1 : dbOpts.cache.EvictFile(dbOpts.cacheID, fileNum)
1046 : }
1047 :
1048 : // removeDB evicts any nodes which have a reference to the DB
1049 : // associated with dbOpts.cacheID. Make sure that there will
1050 : // be no more accesses to the files associated with the DB.
1051 1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
1052 1 : var fileNums []base.DiskFileNum
1053 1 :
1054 1 : c.mu.RLock()
1055 1 : // Collect the fileNums which need to be cleaned.
1056 1 : var firstNode *tableCacheNode
1057 1 : node := c.mu.handHot
1058 1 : for node != firstNode {
1059 1 : if firstNode == nil {
1060 1 : firstNode = node
1061 1 : }
1062 :
1063 1 : if node.cacheID == dbOpts.cacheID {
1064 1 : fileNums = append(fileNums, node.fileNum)
1065 1 : }
1066 1 : node = node.next()
1067 : }
1068 1 : c.mu.RUnlock()
1069 1 :
1070 1 : // Evict all the nodes associated with the DB.
1071 1 : // This should synchronously close all the files
1072 1 : // associated with the DB.
1073 1 : for _, fileNum := range fileNums {
1074 1 : c.evict(fileNum, dbOpts, true)
1075 1 : }
1076 : }
1077 :
1078 1 : func (c *tableCacheShard) Close() error {
1079 1 : c.mu.Lock()
1080 1 : defer c.mu.Unlock()
1081 1 :
1082 1 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1083 1 : // the case that there are leaked iterators.
1084 1 : var err error
1085 1 : if v := c.iterCount.Load(); v > 0 {
1086 1 : if !invariants.RaceEnabled {
1087 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1088 1 : } else {
1089 0 : var buf bytes.Buffer
1090 0 : for _, stack := range c.mu.iters {
1091 0 : fmt.Fprintf(&buf, "%s\n", stack)
1092 0 : }
1093 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1094 : }
1095 : }
1096 :
1097 1 : for c.mu.handHot != nil {
1098 0 : n := c.mu.handHot
1099 0 : if n.value != nil {
1100 0 : if n.value.refCount.Add(-1) == 0 {
1101 0 : c.releasing.Add(1)
1102 0 : c.releasingCh <- n.value
1103 0 : }
1104 : }
1105 0 : c.unlinkNode(n)
1106 : }
1107 1 : c.mu.nodes = nil
1108 1 : c.mu.handHot = nil
1109 1 : c.mu.handCold = nil
1110 1 : c.mu.handTest = nil
1111 1 :
1112 1 : // Only shutdown the releasing goroutine if there were no leaked
1113 1 : // iterators. If there were leaked iterators, we leave the goroutine running
1114 1 : // and the releasingCh open so that a subsequent iterator close can
1115 1 : // complete. This behavior is used by iterator leak tests. Leaking the
1116 1 : // goroutine for these tests is less bad not closing the iterator which
1117 1 : // triggers other warnings about block cache handles not being released.
1118 1 : if err != nil {
1119 1 : c.releasing.Wait()
1120 1 : return err
1121 1 : }
1122 :
1123 1 : close(c.releasingCh)
1124 1 : c.releasing.Wait()
1125 1 : c.releaseLoopExit.Wait()
1126 1 : return err
1127 : }
1128 :
1129 : type tableCacheValue struct {
1130 : closeHook func(i sstable.Iterator) error
1131 : reader *sstable.Reader
1132 : err error
1133 : loaded chan struct{}
1134 : // Reference count for the value. The reader is closed when the reference
1135 : // count drops to zero.
1136 : refCount atomic.Int32
1137 : isShared bool
1138 :
1139 : // readerProvider is embedded here so that we only allocate it once as long as
1140 : // the table stays in the cache. Its state is not always logically tied to
1141 : // this specific tableCacheValue - if a table goes out of the cache and then
1142 : // comes back in, the readerProvider in a now-defunct tableCacheValue can
1143 : // still be used and will internally refer to the new tableCacheValue.
1144 : readerProvider tableCacheShardReaderProvider
1145 : }
1146 :
1147 : func (v *tableCacheValue) load(
1148 : ctx context.Context, backingFileNum base.DiskFileNum, c *tableCacheShard, dbOpts *tableCacheOpts,
1149 1 : ) {
1150 1 : // Try opening the file first.
1151 1 : var f objstorage.Readable
1152 1 : var err error
1153 1 : f, err = dbOpts.objProvider.OpenForReading(
1154 1 : ctx, fileTypeTable, backingFileNum, objstorage.OpenOptions{MustExist: true},
1155 1 : )
1156 1 : if err == nil {
1157 1 : o := dbOpts.readerOpts
1158 1 : o.SetInternalCacheOpts(sstableinternal.CacheOptions{
1159 1 : Cache: dbOpts.cache,
1160 1 : CacheID: dbOpts.cacheID,
1161 1 : FileNum: backingFileNum,
1162 1 : })
1163 1 : v.reader, err = sstable.NewReader(ctx, f, o)
1164 1 : }
1165 1 : if err == nil {
1166 1 : var objMeta objstorage.ObjectMetadata
1167 1 : objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, backingFileNum)
1168 1 : v.isShared = objMeta.IsShared()
1169 1 : }
1170 1 : if err != nil {
1171 1 : v.err = errors.Wrapf(
1172 1 : err, "pebble: backing file %s error", backingFileNum)
1173 1 : }
1174 1 : if v.err != nil {
1175 1 : c.mu.Lock()
1176 1 : defer c.mu.Unlock()
1177 1 : // Lookup the node in the cache again as it might have already been
1178 1 : // removed.
1179 1 : key := tableCacheKey{dbOpts.cacheID, backingFileNum}
1180 1 : n := c.mu.nodes[key]
1181 1 : if n != nil && n.value == v {
1182 1 : c.releaseNode(n)
1183 1 : }
1184 : }
1185 1 : close(v.loaded)
1186 : }
1187 :
1188 1 : func (v *tableCacheValue) release(c *tableCacheShard) {
1189 1 : <-v.loaded
1190 1 : // Nothing to be done about an error at this point. Close the reader if it is
1191 1 : // open.
1192 1 : if v.reader != nil {
1193 1 : _ = v.reader.Close()
1194 1 : }
1195 1 : c.releasing.Done()
1196 : }
1197 :
1198 : type tableCacheNodeType int8
1199 :
1200 : const (
1201 : tableCacheNodeTest tableCacheNodeType = iota
1202 : tableCacheNodeCold
1203 : tableCacheNodeHot
1204 : )
1205 :
1206 0 : func (p tableCacheNodeType) String() string {
1207 0 : switch p {
1208 0 : case tableCacheNodeTest:
1209 0 : return "test"
1210 0 : case tableCacheNodeCold:
1211 0 : return "cold"
1212 0 : case tableCacheNodeHot:
1213 0 : return "hot"
1214 : }
1215 0 : return "unknown"
1216 : }
1217 :
1218 : type tableCacheNode struct {
1219 : fileNum base.DiskFileNum
1220 : value *tableCacheValue
1221 :
1222 : links struct {
1223 : next *tableCacheNode
1224 : prev *tableCacheNode
1225 : }
1226 : ptype tableCacheNodeType
1227 : // referenced is atomically set to indicate that this entry has been accessed
1228 : // since the last time one of the clock hands swept it.
1229 : referenced atomic.Bool
1230 :
1231 : // Storing the cache id associated with the DB instance here
1232 : // avoids the need to thread the dbOpts struct through many functions.
1233 : cacheID cache.ID
1234 : }
1235 :
1236 1 : func (n *tableCacheNode) next() *tableCacheNode {
1237 1 : if n == nil {
1238 0 : return nil
1239 0 : }
1240 1 : return n.links.next
1241 : }
1242 :
1243 1 : func (n *tableCacheNode) prev() *tableCacheNode {
1244 1 : if n == nil {
1245 0 : return nil
1246 0 : }
1247 1 : return n.links.prev
1248 : }
1249 :
1250 1 : func (n *tableCacheNode) link(s *tableCacheNode) {
1251 1 : s.links.prev = n.links.prev
1252 1 : s.links.prev.links.next = s
1253 1 : s.links.next = n
1254 1 : s.links.next.links.prev = s
1255 1 : }
1256 :
1257 1 : func (n *tableCacheNode) unlink() *tableCacheNode {
1258 1 : next := n.links.next
1259 1 : n.links.prev.links.next = n.links.next
1260 1 : n.links.next.links.prev = n.links.prev
1261 1 : n.links.prev = n
1262 1 : n.links.next = n
1263 1 : return next
1264 1 : }
1265 :
1266 : // iterSet holds a set of iterators of various key kinds, all constructed over
1267 : // the same data structure (eg, an sstable). A subset of the fields may be
1268 : // populated depending on the `iterKinds` passed to newIters.
1269 : type iterSet struct {
1270 : point internalIterator
1271 : rangeDeletion keyspan.FragmentIterator
1272 : rangeKey keyspan.FragmentIterator
1273 : }
1274 :
1275 : // TODO(jackson): Consider adding methods for fast paths that check whether an
1276 : // iterator of a particular kind is nil, so that these call sites don't need to
1277 : // reach into the struct's fields directly.
1278 :
1279 : // Point returns the contained point iterator. If there is no point iterator,
1280 : // Point returns a non-nil empty point iterator.
1281 1 : func (s *iterSet) Point() internalIterator {
1282 1 : if s.point == nil {
1283 1 : return emptyIter
1284 1 : }
1285 1 : return s.point
1286 : }
1287 :
1288 : // RangeDeletion returns the contained range deletion iterator. If there is no
1289 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
1290 : // iterator.
1291 1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
1292 1 : if s.rangeDeletion == nil {
1293 1 : return emptyKeyspanIter
1294 1 : }
1295 1 : return s.rangeDeletion
1296 : }
1297 :
1298 : // RangeKey returns the contained range key iterator. If there is no range key
1299 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
1300 1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
1301 1 : if s.rangeKey == nil {
1302 0 : return emptyKeyspanIter
1303 0 : }
1304 1 : return s.rangeKey
1305 : }
1306 :
1307 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
1308 : // must be not be called on the constituent iterators.
1309 1 : func (s *iterSet) CloseAll() error {
1310 1 : var err error
1311 1 : if s.point != nil {
1312 1 : err = s.point.Close()
1313 1 : s.point = nil
1314 1 : }
1315 1 : if s.rangeDeletion != nil {
1316 1 : s.rangeDeletion.Close()
1317 1 : s.rangeDeletion = nil
1318 1 : }
1319 1 : if s.rangeKey != nil {
1320 1 : s.rangeKey.Close()
1321 1 : s.rangeKey = nil
1322 1 : }
1323 1 : return err
1324 : }
1325 :
1326 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
1327 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
1328 : // represent a set of desired iterator kinds.
1329 : type iterKinds uint8
1330 :
1331 1 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
1332 1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
1333 1 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
1334 :
1335 : const (
1336 : iterPointKeys iterKinds = 1 << iota
1337 : iterRangeDeletions
1338 : iterRangeKeys
1339 : )
|