Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/manifest"
23 : "github.com/cockroachdb/pebble/internal/private"
24 : "github.com/cockroachdb/pebble/objstorage"
25 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
26 : "github.com/cockroachdb/pebble/sstable"
27 : )
28 :
29 : var emptyIter = &errorIter{err: nil}
30 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
31 :
32 : // filteredAll is a singleton internalIterator implementation used when an
33 : // sstable does contain point keys, but all the keys are filtered by the active
34 : // PointKeyFilters set in the iterator's IterOptions.
35 : //
36 : // filteredAll implements filteredIter, ensuring the level iterator recognizes
37 : // when it may need to return file boundaries to keep the rangeDelIter open
38 : // during mergingIter operation.
39 : var filteredAll = &filteredAllKeysIter{errorIter: errorIter{err: nil}}
40 :
41 : var _ filteredIter = filteredAll
42 :
43 : type filteredAllKeysIter struct {
44 : errorIter
45 : }
46 :
47 0 : func (s *filteredAllKeysIter) MaybeFilteredKeys() bool {
48 0 : return true
49 0 : }
50 :
51 : // tableNewIters creates new iterators (point, range deletion and/or range key)
52 : // for the given file metadata. Which of the various iterator kinds the user is
53 : // requesting is specified with the iterKinds bitmap.
54 : //
55 : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
56 : // populated with iterators.
57 : //
58 : // If a point iterator is requested and the operation was successful,
59 : // iters.point is guaranteed to be non-nil and must be closed when the caller is
60 : // finished.
61 : //
62 : // If a range deletion or range key iterator is requested, the corresponding
63 : // iterator may be nil if the table does not contain any keys of the
64 : // corresponding kind. The returned iterSet type provides RangeDeletion() and
65 : // RangeKey() convenience methods that return non-nil empty iterators that may
66 : // be used if the caller requires a non-nil iterator.
67 : //
68 : // On error, all iterators are nil.
69 : //
70 : // The only (non-test) implementation of tableNewIters is
71 : // tableCacheContainer.newIters().
72 : type tableNewIters func(
73 : ctx context.Context,
74 : file *manifest.FileMetadata,
75 : opts *IterOptions,
76 : internalOpts internalIterOpts,
77 : kinds iterKinds,
78 : ) (iterSet, error)
79 :
80 : // TODO implements the old tableNewIters interface that always attempted to open
81 : // a point iterator and a range deletion iterator. All call sites should be
82 : // updated to use `f` directly.
83 : func (f tableNewIters) TODO(
84 : ctx context.Context,
85 : file *manifest.FileMetadata,
86 : opts *IterOptions,
87 : internalOpts internalIterOpts,
88 1 : ) (internalIterator, keyspan.FragmentIterator, error) {
89 1 : iters, err := f(ctx, file, opts, internalOpts, iterPointKeys|iterRangeDeletions)
90 1 : if err != nil {
91 1 : return nil, nil, err
92 1 : }
93 1 : return iters.point, iters.rangeDeletion, nil
94 : }
95 :
96 : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
97 : // for the rangedel iterator returned by tableNewIters.
98 1 : func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.TableNewSpanIter {
99 1 : return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
100 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
101 1 : if err != nil {
102 0 : return nil, err
103 0 : }
104 1 : return iters.RangeDeletion(), nil
105 : }
106 : }
107 :
108 : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
109 : // for the range key iterator returned by tableNewIters.
110 1 : func tableNewRangeKeyIter(ctx context.Context, newIters tableNewIters) keyspan.TableNewSpanIter {
111 1 : return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
112 1 : iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
113 1 : if err != nil {
114 1 : return nil, err
115 1 : }
116 1 : return iters.RangeKey(), nil
117 : }
118 : }
119 :
120 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
121 :
122 : // tableCacheOpts contains the db specific fields
123 : // of a table cache. This is stored in the tableCacheContainer
124 : // along with the table cache.
125 : // NB: It is important to make sure that the fields in this
126 : // struct are read-only. Since the fields here are shared
127 : // by every single tableCacheShard, if non read-only fields
128 : // are updated, we could have unnecessary evictions of those
129 : // fields, and the surrounding fields from the CPU caches.
130 : type tableCacheOpts struct {
131 : // iterCount keeps track of how many iterators are open. It is used to keep
132 : // track of leaked iterators on a per-db level.
133 : iterCount *atomic.Int32
134 :
135 : loggerAndTracer LoggerAndTracer
136 : cacheID uint64
137 : objProvider objstorage.Provider
138 : opts sstable.ReaderOptions
139 : filterMetrics *sstable.FilterMetricsTracker
140 : sstStatsCollector *sstable.CategoryStatsCollector
141 : }
142 :
143 : // tableCacheContainer contains the table cache and
144 : // fields which are unique to the DB.
145 : type tableCacheContainer struct {
146 : tableCache *TableCache
147 :
148 : // dbOpts contains fields relevant to the table cache
149 : // which are unique to each DB.
150 : dbOpts tableCacheOpts
151 : }
152 :
153 : // newTableCacheContainer will panic if the underlying cache in the table cache
154 : // doesn't match Options.Cache.
155 : func newTableCacheContainer(
156 : tc *TableCache,
157 : cacheID uint64,
158 : objProvider objstorage.Provider,
159 : opts *Options,
160 : size int,
161 : sstStatsCollector *sstable.CategoryStatsCollector,
162 1 : ) *tableCacheContainer {
163 1 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
164 1 : if tc != nil {
165 1 : if tc.cache != opts.Cache {
166 0 : panic("pebble: underlying cache for the table cache and db are different")
167 : }
168 1 : tc.Ref()
169 1 : } else {
170 1 : // NewTableCache should create a ref to tc which the container should
171 1 : // drop whenever it is closed.
172 1 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
173 1 : }
174 :
175 1 : t := &tableCacheContainer{}
176 1 : t.tableCache = tc
177 1 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
178 1 : t.dbOpts.cacheID = cacheID
179 1 : t.dbOpts.objProvider = objProvider
180 1 : t.dbOpts.opts = opts.MakeReaderOptions()
181 1 : t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
182 1 : t.dbOpts.iterCount = new(atomic.Int32)
183 1 : t.dbOpts.sstStatsCollector = sstStatsCollector
184 1 : return t
185 : }
186 :
187 : // Before calling close, make sure that there will be no further need
188 : // to access any of the files associated with the store.
189 1 : func (c *tableCacheContainer) close() error {
190 1 : // We want to do some cleanup work here. Check for leaked iterators
191 1 : // by the DB using this container. Note that we'll still perform cleanup
192 1 : // below in the case that there are leaked iterators.
193 1 : var err error
194 1 : if v := c.dbOpts.iterCount.Load(); v > 0 {
195 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
196 1 : }
197 :
198 : // Release nodes here.
199 1 : for _, shard := range c.tableCache.shards {
200 1 : if shard != nil {
201 1 : shard.removeDB(&c.dbOpts)
202 1 : }
203 : }
204 1 : return firstError(err, c.tableCache.Unref())
205 : }
206 :
207 : func (c *tableCacheContainer) newIters(
208 : ctx context.Context,
209 : file *manifest.FileMetadata,
210 : opts *IterOptions,
211 : internalOpts internalIterOpts,
212 : kinds iterKinds,
213 1 : ) (iterSet, error) {
214 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
215 1 : }
216 :
217 : // getTableProperties returns the properties associated with the backing physical
218 : // table if the input metadata belongs to a virtual sstable.
219 1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
220 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
221 1 : }
222 :
223 1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
224 1 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
225 1 : }
226 :
227 1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
228 1 : var m CacheMetrics
229 1 : for i := range c.tableCache.shards {
230 1 : s := c.tableCache.shards[i]
231 1 : s.mu.RLock()
232 1 : m.Count += int64(len(s.mu.nodes))
233 1 : s.mu.RUnlock()
234 1 : m.Hits += s.hits.Load()
235 1 : m.Misses += s.misses.Load()
236 1 : }
237 1 : m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
238 1 : f := c.dbOpts.filterMetrics.Load()
239 1 : return m, f
240 : }
241 :
242 : func (c *tableCacheContainer) estimateSize(
243 : meta *fileMetadata, lower, upper []byte,
244 1 : ) (size uint64, err error) {
245 1 : if meta.Virtual {
246 1 : err = c.withVirtualReader(
247 1 : meta.VirtualMeta(),
248 1 : func(r sstable.VirtualReader) (err error) {
249 1 : size, err = r.EstimateDiskUsage(lower, upper)
250 1 : return err
251 1 : },
252 : )
253 1 : } else {
254 1 : err = c.withReader(
255 1 : meta.PhysicalMeta(),
256 1 : func(r *sstable.Reader) (err error) {
257 1 : size, err = r.EstimateDiskUsage(lower, upper)
258 1 : return err
259 1 : },
260 : )
261 : }
262 1 : if err != nil {
263 0 : return 0, err
264 0 : }
265 1 : return size, nil
266 : }
267 :
268 : // createCommonReader creates a Reader for this file.
269 1 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
270 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
271 1 : var cr sstable.CommonReader = v.reader
272 1 : if file.Virtual {
273 1 : virtualReader := sstable.MakeVirtualReader(
274 1 : v.reader, file.VirtualMeta(), v.isShared,
275 1 : )
276 1 : cr = &virtualReader
277 1 : }
278 1 : return cr
279 : }
280 :
281 : func (c *tableCacheContainer) withCommonReader(
282 : meta *fileMetadata, fn func(sstable.CommonReader) error,
283 1 : ) error {
284 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
285 1 : v := s.findNode(meta, &c.dbOpts)
286 1 : defer s.unrefValue(v)
287 1 : if v.err != nil {
288 0 : return v.err
289 0 : }
290 1 : return fn(createCommonReader(v, meta))
291 : }
292 :
293 1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
294 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
295 1 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
296 1 : defer s.unrefValue(v)
297 1 : if v.err != nil {
298 1 : return v.err
299 1 : }
300 1 : return fn(v.reader)
301 : }
302 :
303 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
304 : func (c *tableCacheContainer) withVirtualReader(
305 : meta virtualMeta, fn func(sstable.VirtualReader) error,
306 1 : ) error {
307 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
308 1 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
309 1 : defer s.unrefValue(v)
310 1 : if v.err != nil {
311 0 : return v.err
312 0 : }
313 1 : provider := c.dbOpts.objProvider
314 1 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
315 1 : if err != nil {
316 0 : return err
317 0 : }
318 1 : return fn(sstable.MakeVirtualReader(v.reader, meta, objMeta.IsShared()))
319 : }
320 :
321 1 : func (c *tableCacheContainer) iterCount() int64 {
322 1 : return int64(c.dbOpts.iterCount.Load())
323 1 : }
324 :
325 : // TableCache is a shareable cache for open sstables.
326 : type TableCache struct {
327 : refs atomic.Int64
328 :
329 : cache *Cache
330 : shards []*tableCacheShard
331 : }
332 :
333 : // Ref adds a reference to the table cache. Once tableCache.init returns,
334 : // the table cache only remains valid if there is at least one reference
335 : // to it.
336 1 : func (c *TableCache) Ref() {
337 1 : v := c.refs.Add(1)
338 1 : // We don't want the reference count to ever go from 0 -> 1,
339 1 : // cause a reference count of 0 implies that we've closed the cache.
340 1 : if v <= 1 {
341 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
342 : }
343 : }
344 :
345 : // Unref removes a reference to the table cache.
346 1 : func (c *TableCache) Unref() error {
347 1 : v := c.refs.Add(-1)
348 1 : switch {
349 1 : case v < 0:
350 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
351 1 : case v == 0:
352 1 : var err error
353 1 : for i := range c.shards {
354 1 : // The cache shard is not allocated yet, nothing to close
355 1 : if c.shards[i] == nil {
356 0 : continue
357 : }
358 1 : err = firstError(err, c.shards[i].Close())
359 : }
360 :
361 : // Unref the cache which we create a reference to when the tableCache
362 : // is first instantiated.
363 1 : c.cache.Unref()
364 1 : return err
365 : }
366 1 : return nil
367 : }
368 :
369 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
370 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
371 1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
372 1 : if size == 0 {
373 0 : panic("pebble: cannot create a table cache of size 0")
374 1 : } else if numShards == 0 {
375 0 : panic("pebble: cannot create a table cache with 0 shards")
376 : }
377 :
378 1 : c := &TableCache{}
379 1 : c.cache = cache
380 1 : c.cache.Ref()
381 1 :
382 1 : c.shards = make([]*tableCacheShard, numShards)
383 1 : for i := range c.shards {
384 1 : c.shards[i] = &tableCacheShard{}
385 1 : c.shards[i].init(size / len(c.shards))
386 1 : }
387 :
388 : // Hold a ref to the cache here.
389 1 : c.refs.Store(1)
390 1 :
391 1 : return c
392 : }
393 :
394 1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
395 1 : return c.shards[uint64(fileNum)%uint64(len(c.shards))]
396 1 : }
397 :
398 : type tableCacheKey struct {
399 : cacheID uint64
400 : fileNum base.DiskFileNum
401 : }
402 :
403 : type tableCacheShard struct {
404 : hits atomic.Int64
405 : misses atomic.Int64
406 : iterCount atomic.Int32
407 :
408 : size int
409 :
410 : mu struct {
411 : sync.RWMutex
412 : nodes map[tableCacheKey]*tableCacheNode
413 : // The iters map is only created and populated in race builds.
414 : iters map[io.Closer][]byte
415 :
416 : handHot *tableCacheNode
417 : handCold *tableCacheNode
418 : handTest *tableCacheNode
419 :
420 : coldTarget int
421 : sizeHot int
422 : sizeCold int
423 : sizeTest int
424 : }
425 : releasing sync.WaitGroup
426 : releasingCh chan *tableCacheValue
427 : releaseLoopExit sync.WaitGroup
428 : }
429 :
430 1 : func (c *tableCacheShard) init(size int) {
431 1 : c.size = size
432 1 :
433 1 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
434 1 : c.mu.coldTarget = size
435 1 : c.releasingCh = make(chan *tableCacheValue, 100)
436 1 : c.releaseLoopExit.Add(1)
437 1 : go c.releaseLoop()
438 1 :
439 1 : if invariants.RaceEnabled {
440 0 : c.mu.iters = make(map[io.Closer][]byte)
441 0 : }
442 : }
443 :
444 1 : func (c *tableCacheShard) releaseLoop() {
445 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
446 1 : defer c.releaseLoopExit.Done()
447 1 : for v := range c.releasingCh {
448 1 : v.release(c)
449 1 : }
450 : })
451 : }
452 :
453 : // checkAndIntersectFilters checks the specific table and block property filters
454 : // for intersection with any available table and block-level properties. Returns
455 : // true for ok if this table should be read by this iterator.
456 : func (c *tableCacheShard) checkAndIntersectFilters(
457 : v *tableCacheValue,
458 : tableFilter func(userProps map[string]string) bool,
459 : blockPropertyFilters []BlockPropertyFilter,
460 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
461 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
462 1 : if tableFilter != nil &&
463 1 : !tableFilter(v.reader.Properties.UserProperties) {
464 0 : return false, nil, nil
465 0 : }
466 :
467 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
468 1 : filterer, err = sstable.IntersectsTable(
469 1 : blockPropertyFilters,
470 1 : boundLimitedFilter,
471 1 : v.reader.Properties.UserProperties,
472 1 : )
473 1 : // NB: IntersectsTable will return a nil filterer if the table-level
474 1 : // properties indicate there's no intersection with the provided filters.
475 1 : if filterer == nil || err != nil {
476 1 : return false, nil, err
477 1 : }
478 : }
479 1 : return true, filterer, nil
480 : }
481 :
482 : func (c *tableCacheShard) newIters(
483 : ctx context.Context,
484 : file *manifest.FileMetadata,
485 : opts *IterOptions,
486 : internalOpts internalIterOpts,
487 : dbOpts *tableCacheOpts,
488 : kinds iterKinds,
489 1 : ) (iterSet, error) {
490 1 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
491 1 : // since parts of the sstable are read during the construction. The Reader
492 1 : // should not remember that context since the Reader can be long-lived.
493 1 :
494 1 : // Calling findNode gives us the responsibility of decrementing v's
495 1 : // refCount. If opening the underlying table resulted in error, then we
496 1 : // decrement this straight away. Otherwise, we pass that responsibility to
497 1 : // the sstable iterator, which decrements when it is closed.
498 1 : v := c.findNode(file, dbOpts)
499 1 : if v.err != nil {
500 1 : defer c.unrefValue(v)
501 1 : return iterSet{}, v.err
502 1 : }
503 :
504 : // Note: This suffers an allocation for virtual sstables.
505 1 : cr := createCommonReader(v, file)
506 1 : var iters iterSet
507 1 : var err error
508 1 : if kinds.RangeKey() && file.HasRangeKeys {
509 1 : iters.rangeKey, err = c.newRangeKeyIter(v, cr, opts.SpanIterOptions())
510 1 : }
511 1 : if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
512 1 : iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
513 1 : }
514 1 : if kinds.Point() && err == nil {
515 1 : iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
516 1 : }
517 1 : if err != nil {
518 1 : // NB: There's a subtlety here: Because the point iterator is the last
519 1 : // iterator we attempt to create, it's not possible for:
520 1 : // err != nil && iters.point != nil
521 1 : // If it were possible, we'd need to account for it to avoid double
522 1 : // unref-ing here, once during CloseAll and once during `unrefValue`.
523 1 : iters.CloseAll()
524 1 : c.unrefValue(v)
525 1 : return iterSet{}, err
526 1 : }
527 : // Only point iterators ever require the reader stay pinned in the cache. If
528 : // we're not returning a point iterator to the caller, we need to unref v.
529 : // There's an added subtlety that iters.point may be non-nil but not a true
530 : // iterator that's ref'd the underlying reader if block filters excluded the
531 : // entirety of the table.
532 : //
533 : // TODO(jackson): This `filteredAll` subtlety can be removed after the
534 : // planned #2863 refactor, when there will be no need to return a special
535 : // empty iterator type to signify that point keys were filtered.
536 1 : if iters.point == nil || iters.point == filteredAll {
537 1 : c.unrefValue(v)
538 1 : }
539 1 : return iters, nil
540 : }
541 :
542 : // newPointIter is an internal helper that constructs a point iterator over a
543 : // sstable. This function is for internal use only, and callers should use
544 : // newIters instead.
545 : func (c *tableCacheShard) newPointIter(
546 : ctx context.Context,
547 : v *tableCacheValue,
548 : file *manifest.FileMetadata,
549 : cr sstable.CommonReader,
550 : opts *IterOptions,
551 : internalOpts internalIterOpts,
552 : dbOpts *tableCacheOpts,
553 1 : ) (internalIterator, error) {
554 1 : var (
555 1 : hideObsoletePoints bool = false
556 1 : pointKeyFilters []BlockPropertyFilter
557 1 : filterer *sstable.BlockPropertiesFilterer
558 1 : )
559 1 : if opts != nil {
560 1 : // This code is appending (at most one filter) in-place to
561 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
562 1 : // the same iterator tree. This is acceptable since all the following
563 1 : // properties are true:
564 1 : // - The iterator tree is single threaded, so the shared backing for the
565 1 : // slice is being mutated in a single threaded manner.
566 1 : // - Each shallow copy of the slice has its own notion of length.
567 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
568 1 : // struct, which is stateless, so overwriting that struct when creating
569 1 : // one sstable iterator is harmless to other sstable iterators that are
570 1 : // relying on that struct.
571 1 : //
572 1 : // An alternative would be to have different slices for different sstable
573 1 : // iterators, but that requires more work to avoid allocations.
574 1 : //
575 1 : // TODO(bilal): for compaction reads of foreign sstables, we do hide
576 1 : // obsolete points (see sstable.Reader.newCompactionIter) but we don't
577 1 : // apply the obsolete block property filter. We could optimize this by
578 1 : // applying the filter.
579 1 : hideObsoletePoints, pointKeyFilters =
580 1 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
581 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
582 1 :
583 1 : var ok bool
584 1 : var err error
585 1 : ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
586 1 : pointKeyFilters, internalOpts.boundLimitedFilter)
587 1 : if err != nil {
588 0 : return nil, err
589 1 : } else if !ok {
590 1 : // Return an empty iterator. This iterator has no mutable state, so
591 1 : // using a singleton is fine. We must return `filteredAll` instead
592 1 : // of nil so that the returned iterator returns MaybeFilteredKeys()
593 1 : // = true.
594 1 : //
595 1 : // TODO(jackson): This `filteredAll` subtlety can be removed after the
596 1 : // planned #2863 refactor, when there will be no need to return a special
597 1 : // empty iterator type to signify that point keys were filtered.
598 1 : return filteredAll, err
599 1 : }
600 : }
601 :
602 1 : var iter sstable.Iterator
603 1 : useFilter := true
604 1 : if opts != nil {
605 1 : useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
606 1 : ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
607 1 : }
608 1 : tableFormat, err := v.reader.TableFormat()
609 1 : if err != nil {
610 0 : return nil, err
611 0 : }
612 1 : var rp sstable.ReaderProvider
613 1 : if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
614 1 : rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
615 1 : }
616 :
617 1 : if v.isShared && v.reader.Properties.GlobalSeqNum != 0 {
618 1 : if tableFormat < sstable.TableFormatPebblev4 {
619 0 : return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
620 0 : }
621 : // The table is shared and ingested.
622 1 : hideObsoletePoints = true
623 : }
624 1 : var categoryAndQoS sstable.CategoryAndQoS
625 1 : if opts != nil {
626 1 : categoryAndQoS = opts.CategoryAndQoS
627 1 : }
628 1 : if internalOpts.bytesIterated != nil {
629 1 : iter, err = cr.NewCompactionIter(
630 1 : internalOpts.bytesIterated, categoryAndQoS, dbOpts.sstStatsCollector, rp,
631 1 : internalOpts.bufferPool)
632 1 : } else {
633 1 : iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
634 1 : ctx, opts.GetLowerBound(), opts.GetUpperBound(), filterer, hideObsoletePoints, useFilter,
635 1 : internalOpts.stats, categoryAndQoS, dbOpts.sstStatsCollector, rp)
636 1 : }
637 1 : if err != nil {
638 1 : return nil, err
639 1 : }
640 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
641 : // care to avoid introducing an allocation here by adding a closure.
642 1 : iter.SetCloseHook(v.closeHook)
643 1 : c.iterCount.Add(1)
644 1 : dbOpts.iterCount.Add(1)
645 1 : if invariants.RaceEnabled {
646 0 : c.mu.Lock()
647 0 : c.mu.iters[iter] = debug.Stack()
648 0 : c.mu.Unlock()
649 0 : }
650 1 : return iter, nil
651 : }
652 :
653 : // newRangeDelIter is an internal helper that constructs an iterator over a
654 : // sstable's range deletions. This function is for table-cache internal use
655 : // only, and callers should use newIters instead.
656 : func (c *tableCacheShard) newRangeDelIter(
657 : ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *tableCacheOpts,
658 1 : ) (keyspan.FragmentIterator, error) {
659 1 : // NB: range-del iterator does not maintain a reference to the table, nor
660 1 : // does it need to read from it after creation.
661 1 : rangeDelIter, err := cr.NewRawRangeDelIter()
662 1 : if err != nil {
663 1 : return nil, err
664 1 : }
665 : // Assert expected bounds in tests.
666 1 : if invariants.Enabled && rangeDelIter != nil {
667 1 : cmp := base.DefaultComparer.Compare
668 1 : if dbOpts.opts.Comparer != nil {
669 1 : cmp = dbOpts.opts.Comparer.Compare
670 1 : }
671 : // TODO(radu): we should be using AssertBounds, but it currently fails in
672 : // some cases (#3167).
673 1 : rangeDelIter = keyspan.AssertUserKeyBounds(
674 1 : rangeDelIter, file.SmallestPointKey.UserKey, file.LargestPointKey.UserKey, cmp,
675 1 : )
676 : }
677 1 : return rangeDelIter, nil
678 : }
679 :
680 : // newRangeKeyIter is an internal helper that constructs an iterator over a
681 : // sstable's range keys. This function is for table-cache internal use only, and
682 : // callers should use newIters instead.
683 : func (c *tableCacheShard) newRangeKeyIter(
684 : v *tableCacheValue, cr sstable.CommonReader, opts keyspan.SpanIterOptions,
685 1 : ) (keyspan.FragmentIterator, error) {
686 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
687 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
688 1 : // file's range key blocks may surface deleted range keys below. This is
689 1 : // done here, rather than deferring to the block-property collector in order
690 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
691 1 : if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
692 0 : ok, _, err := c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil)
693 0 : if err != nil {
694 0 : return nil, err
695 0 : } else if !ok {
696 0 : return nil, nil
697 0 : }
698 : }
699 1 : return cr.NewRawRangeKeyIter()
700 : }
701 :
702 : type tableCacheShardReaderProvider struct {
703 : c *tableCacheShard
704 : file *manifest.FileMetadata
705 : dbOpts *tableCacheOpts
706 : v *tableCacheValue
707 : }
708 :
709 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
710 :
711 : // GetReader implements sstable.ReaderProvider. Note that it is not the
712 : // responsibility of tableCacheShardReaderProvider to ensure that the file
713 : // continues to exist. The ReaderProvider is used in iterators where the
714 : // top-level iterator is pinning the read state and preventing the files from
715 : // being deleted.
716 : //
717 : // The caller must call tableCacheShardReaderProvider.Close.
718 : //
719 : // Note that currently the Reader returned here is only used to read value
720 : // blocks. This reader shouldn't be used for other purposes like reading keys
721 : // outside of virtual sstable bounds.
722 : //
723 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
724 : // that the reader isn't used for other purposes.
725 1 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
726 1 : // Calling findNode gives us the responsibility of decrementing v's
727 1 : // refCount.
728 1 : v := rp.c.findNode(rp.file, rp.dbOpts)
729 1 : if v.err != nil {
730 0 : defer rp.c.unrefValue(v)
731 0 : return nil, v.err
732 0 : }
733 1 : rp.v = v
734 1 : return v.reader, nil
735 : }
736 :
737 : // Close implements sstable.ReaderProvider.
738 1 : func (rp *tableCacheShardReaderProvider) Close() {
739 1 : rp.c.unrefValue(rp.v)
740 1 : rp.v = nil
741 1 : }
742 :
743 : // getTableProperties return sst table properties for target file
744 : func (c *tableCacheShard) getTableProperties(
745 : file *fileMetadata, dbOpts *tableCacheOpts,
746 1 : ) (*sstable.Properties, error) {
747 1 : // Calling findNode gives us the responsibility of decrementing v's refCount here
748 1 : v := c.findNode(file, dbOpts)
749 1 : defer c.unrefValue(v)
750 1 :
751 1 : if v.err != nil {
752 0 : return nil, v.err
753 0 : }
754 1 : return &v.reader.Properties, nil
755 : }
756 :
757 : // releaseNode releases a node from the tableCacheShard.
758 : //
759 : // c.mu must be held when calling this.
760 1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
761 1 : c.unlinkNode(n)
762 1 : c.clearNode(n)
763 1 : }
764 :
765 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
766 : // reference in place.
767 : //
768 : // c.mu must be held when calling this.
769 1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
770 1 : key := tableCacheKey{n.cacheID, n.fileNum}
771 1 : delete(c.mu.nodes, key)
772 1 :
773 1 : switch n.ptype {
774 1 : case tableCacheNodeHot:
775 1 : c.mu.sizeHot--
776 1 : case tableCacheNodeCold:
777 1 : c.mu.sizeCold--
778 1 : case tableCacheNodeTest:
779 1 : c.mu.sizeTest--
780 : }
781 :
782 1 : if n == c.mu.handHot {
783 1 : c.mu.handHot = c.mu.handHot.prev()
784 1 : }
785 1 : if n == c.mu.handCold {
786 1 : c.mu.handCold = c.mu.handCold.prev()
787 1 : }
788 1 : if n == c.mu.handTest {
789 1 : c.mu.handTest = c.mu.handTest.prev()
790 1 : }
791 :
792 1 : if n.unlink() == n {
793 1 : // This was the last entry in the cache.
794 1 : c.mu.handHot = nil
795 1 : c.mu.handCold = nil
796 1 : c.mu.handTest = nil
797 1 : }
798 :
799 1 : n.links.prev = nil
800 1 : n.links.next = nil
801 : }
802 :
803 1 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
804 1 : if v := n.value; v != nil {
805 1 : n.value = nil
806 1 : c.unrefValue(v)
807 1 : }
808 : }
809 :
810 : // unrefValue decrements the reference count for the specified value, releasing
811 : // it if the reference count fell to 0. Note that the value has a reference if
812 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
813 : // the node has already been removed from that map.
814 1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
815 1 : if v.refCount.Add(-1) == 0 {
816 1 : c.releasing.Add(1)
817 1 : c.releasingCh <- v
818 1 : }
819 : }
820 :
821 : // findNode returns the node for the table with the given file number, creating
822 : // that node if it didn't already exist. The caller is responsible for
823 : // decrementing the returned node's refCount.
824 1 : func (c *tableCacheShard) findNode(meta *fileMetadata, dbOpts *tableCacheOpts) *tableCacheValue {
825 1 : v := c.findNodeInternal(meta, dbOpts)
826 1 :
827 1 : // Loading a file before its global sequence number is known (eg,
828 1 : // during ingest before entering the commit pipeline) can pollute
829 1 : // the cache with incorrect state. In invariant builds, verify
830 1 : // that the global sequence number of the returned reader matches.
831 1 : if invariants.Enabled {
832 1 : if v.reader != nil && meta.LargestSeqNum == meta.SmallestSeqNum &&
833 1 : v.reader.Properties.GlobalSeqNum != meta.SmallestSeqNum {
834 0 : panic(errors.AssertionFailedf("file %s loaded from table cache with the wrong global sequence number %d",
835 0 : meta, v.reader.Properties.GlobalSeqNum))
836 : }
837 : }
838 1 : return v
839 : }
840 :
841 : func (c *tableCacheShard) findNodeInternal(
842 : meta *fileMetadata, dbOpts *tableCacheOpts,
843 1 : ) *tableCacheValue {
844 1 : if refs := meta.Refs(); refs <= 0 {
845 0 : panic(errors.AssertionFailedf("attempting to load file %s with refs=%d from table cache",
846 0 : meta, refs))
847 : }
848 : // Fast-path for a hit in the cache.
849 1 : c.mu.RLock()
850 1 : key := tableCacheKey{dbOpts.cacheID, meta.FileBacking.DiskFileNum}
851 1 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
852 1 : // Fast-path hit.
853 1 : //
854 1 : // The caller is responsible for decrementing the refCount.
855 1 : v := n.value
856 1 : v.refCount.Add(1)
857 1 : c.mu.RUnlock()
858 1 : n.referenced.Store(true)
859 1 : c.hits.Add(1)
860 1 : <-v.loaded
861 1 : return v
862 1 : }
863 1 : c.mu.RUnlock()
864 1 :
865 1 : c.mu.Lock()
866 1 :
867 1 : n := c.mu.nodes[key]
868 1 : switch {
869 1 : case n == nil:
870 1 : // Slow-path miss of a non-existent node.
871 1 : n = &tableCacheNode{
872 1 : fileNum: meta.FileBacking.DiskFileNum,
873 1 : ptype: tableCacheNodeCold,
874 1 : }
875 1 : c.addNode(n, dbOpts)
876 1 : c.mu.sizeCold++
877 :
878 1 : case n.value != nil:
879 1 : // Slow-path hit of a hot or cold node.
880 1 : //
881 1 : // The caller is responsible for decrementing the refCount.
882 1 : v := n.value
883 1 : v.refCount.Add(1)
884 1 : n.referenced.Store(true)
885 1 : c.hits.Add(1)
886 1 : c.mu.Unlock()
887 1 : <-v.loaded
888 1 : return v
889 :
890 1 : default:
891 1 : // Slow-path miss of a test node.
892 1 : c.unlinkNode(n)
893 1 : c.mu.coldTarget++
894 1 : if c.mu.coldTarget > c.size {
895 1 : c.mu.coldTarget = c.size
896 1 : }
897 :
898 1 : n.referenced.Store(false)
899 1 : n.ptype = tableCacheNodeHot
900 1 : c.addNode(n, dbOpts)
901 1 : c.mu.sizeHot++
902 : }
903 :
904 1 : c.misses.Add(1)
905 1 :
906 1 : v := &tableCacheValue{
907 1 : loaded: make(chan struct{}),
908 1 : }
909 1 : v.refCount.Store(2)
910 1 : // Cache the closure invoked when an iterator is closed. This avoids an
911 1 : // allocation on every call to newIters.
912 1 : v.closeHook = func(i sstable.Iterator) error {
913 1 : if invariants.RaceEnabled {
914 0 : c.mu.Lock()
915 0 : delete(c.mu.iters, i)
916 0 : c.mu.Unlock()
917 0 : }
918 1 : c.unrefValue(v)
919 1 : c.iterCount.Add(-1)
920 1 : dbOpts.iterCount.Add(-1)
921 1 : return nil
922 : }
923 1 : n.value = v
924 1 :
925 1 : c.mu.Unlock()
926 1 :
927 1 : // Note adding to the cache lists must complete before we begin loading the
928 1 : // table as a failure during load will result in the node being unlinked.
929 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
930 1 : v.load(
931 1 : loadInfo{
932 1 : backingFileNum: meta.FileBacking.DiskFileNum,
933 1 : smallestSeqNum: meta.SmallestSeqNum,
934 1 : largestSeqNum: meta.LargestSeqNum,
935 1 : }, c, dbOpts)
936 1 : })
937 1 : return v
938 : }
939 :
940 1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
941 1 : c.evictNodes()
942 1 : n.cacheID = dbOpts.cacheID
943 1 : key := tableCacheKey{n.cacheID, n.fileNum}
944 1 : c.mu.nodes[key] = n
945 1 :
946 1 : n.links.next = n
947 1 : n.links.prev = n
948 1 : if c.mu.handHot == nil {
949 1 : // First element.
950 1 : c.mu.handHot = n
951 1 : c.mu.handCold = n
952 1 : c.mu.handTest = n
953 1 : } else {
954 1 : c.mu.handHot.link(n)
955 1 : }
956 :
957 1 : if c.mu.handCold == c.mu.handHot {
958 1 : c.mu.handCold = c.mu.handCold.prev()
959 1 : }
960 : }
961 :
962 1 : func (c *tableCacheShard) evictNodes() {
963 1 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
964 1 : c.runHandCold()
965 1 : }
966 : }
967 :
968 1 : func (c *tableCacheShard) runHandCold() {
969 1 : n := c.mu.handCold
970 1 : if n.ptype == tableCacheNodeCold {
971 1 : if n.referenced.Load() {
972 1 : n.referenced.Store(false)
973 1 : n.ptype = tableCacheNodeHot
974 1 : c.mu.sizeCold--
975 1 : c.mu.sizeHot++
976 1 : } else {
977 1 : c.clearNode(n)
978 1 : n.ptype = tableCacheNodeTest
979 1 : c.mu.sizeCold--
980 1 : c.mu.sizeTest++
981 1 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
982 1 : c.runHandTest()
983 1 : }
984 : }
985 : }
986 :
987 1 : c.mu.handCold = c.mu.handCold.next()
988 1 :
989 1 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
990 1 : c.runHandHot()
991 1 : }
992 : }
993 :
994 1 : func (c *tableCacheShard) runHandHot() {
995 1 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
996 1 : c.runHandTest()
997 1 : if c.mu.handHot == nil {
998 0 : return
999 0 : }
1000 : }
1001 :
1002 1 : n := c.mu.handHot
1003 1 : if n.ptype == tableCacheNodeHot {
1004 1 : if n.referenced.Load() {
1005 1 : n.referenced.Store(false)
1006 1 : } else {
1007 1 : n.ptype = tableCacheNodeCold
1008 1 : c.mu.sizeHot--
1009 1 : c.mu.sizeCold++
1010 1 : }
1011 : }
1012 :
1013 1 : c.mu.handHot = c.mu.handHot.next()
1014 : }
1015 :
1016 1 : func (c *tableCacheShard) runHandTest() {
1017 1 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
1018 1 : c.runHandCold()
1019 1 : if c.mu.handTest == nil {
1020 0 : return
1021 0 : }
1022 : }
1023 :
1024 1 : n := c.mu.handTest
1025 1 : if n.ptype == tableCacheNodeTest {
1026 1 : c.mu.coldTarget--
1027 1 : if c.mu.coldTarget < 0 {
1028 1 : c.mu.coldTarget = 0
1029 1 : }
1030 1 : c.unlinkNode(n)
1031 1 : c.clearNode(n)
1032 : }
1033 :
1034 1 : c.mu.handTest = c.mu.handTest.next()
1035 : }
1036 :
1037 1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
1038 1 : c.mu.Lock()
1039 1 : key := tableCacheKey{dbOpts.cacheID, fileNum}
1040 1 : n := c.mu.nodes[key]
1041 1 : var v *tableCacheValue
1042 1 : if n != nil {
1043 1 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
1044 1 : // the tableCacheNode.release() call synchronously below to ensure the
1045 1 : // sstable file descriptor is closed before returning. Note that
1046 1 : // tableCacheShard.releasing needs to be incremented while holding
1047 1 : // tableCacheShard.mu in order to avoid a race with Close()
1048 1 : c.unlinkNode(n)
1049 1 : v = n.value
1050 1 : if v != nil {
1051 1 : if !allowLeak {
1052 1 : if t := v.refCount.Add(-1); t != 0 {
1053 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
1054 0 : }
1055 : }
1056 1 : c.releasing.Add(1)
1057 : }
1058 : }
1059 :
1060 1 : c.mu.Unlock()
1061 1 :
1062 1 : if v != nil {
1063 1 : v.release(c)
1064 1 : }
1065 :
1066 1 : dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
1067 : }
1068 :
1069 : // removeDB evicts any nodes which have a reference to the DB
1070 : // associated with dbOpts.cacheID. Make sure that there will
1071 : // be no more accesses to the files associated with the DB.
1072 1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
1073 1 : var fileNums []base.DiskFileNum
1074 1 :
1075 1 : c.mu.RLock()
1076 1 : // Collect the fileNums which need to be cleaned.
1077 1 : var firstNode *tableCacheNode
1078 1 : node := c.mu.handHot
1079 1 : for node != firstNode {
1080 1 : if firstNode == nil {
1081 1 : firstNode = node
1082 1 : }
1083 :
1084 1 : if node.cacheID == dbOpts.cacheID {
1085 1 : fileNums = append(fileNums, node.fileNum)
1086 1 : }
1087 1 : node = node.next()
1088 : }
1089 1 : c.mu.RUnlock()
1090 1 :
1091 1 : // Evict all the nodes associated with the DB.
1092 1 : // This should synchronously close all the files
1093 1 : // associated with the DB.
1094 1 : for _, fileNum := range fileNums {
1095 1 : c.evict(fileNum, dbOpts, true)
1096 1 : }
1097 : }
1098 :
1099 1 : func (c *tableCacheShard) Close() error {
1100 1 : c.mu.Lock()
1101 1 : defer c.mu.Unlock()
1102 1 :
1103 1 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1104 1 : // the case that there are leaked iterators.
1105 1 : var err error
1106 1 : if v := c.iterCount.Load(); v > 0 {
1107 1 : if !invariants.RaceEnabled {
1108 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1109 1 : } else {
1110 0 : var buf bytes.Buffer
1111 0 : for _, stack := range c.mu.iters {
1112 0 : fmt.Fprintf(&buf, "%s\n", stack)
1113 0 : }
1114 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1115 : }
1116 : }
1117 :
1118 1 : for c.mu.handHot != nil {
1119 0 : n := c.mu.handHot
1120 0 : if n.value != nil {
1121 0 : if n.value.refCount.Add(-1) == 0 {
1122 0 : c.releasing.Add(1)
1123 0 : c.releasingCh <- n.value
1124 0 : }
1125 : }
1126 0 : c.unlinkNode(n)
1127 : }
1128 1 : c.mu.nodes = nil
1129 1 : c.mu.handHot = nil
1130 1 : c.mu.handCold = nil
1131 1 : c.mu.handTest = nil
1132 1 :
1133 1 : // Only shutdown the releasing goroutine if there were no leaked
1134 1 : // iterators. If there were leaked iterators, we leave the goroutine running
1135 1 : // and the releasingCh open so that a subsequent iterator close can
1136 1 : // complete. This behavior is used by iterator leak tests. Leaking the
1137 1 : // goroutine for these tests is less bad not closing the iterator which
1138 1 : // triggers other warnings about block cache handles not being released.
1139 1 : if err != nil {
1140 1 : c.releasing.Wait()
1141 1 : return err
1142 1 : }
1143 :
1144 1 : close(c.releasingCh)
1145 1 : c.releasing.Wait()
1146 1 : c.releaseLoopExit.Wait()
1147 1 : return err
1148 : }
1149 :
1150 : type tableCacheValue struct {
1151 : closeHook func(i sstable.Iterator) error
1152 : reader *sstable.Reader
1153 : err error
1154 : isShared bool
1155 : loaded chan struct{}
1156 : // Reference count for the value. The reader is closed when the reference
1157 : // count drops to zero.
1158 : refCount atomic.Int32
1159 : }
1160 :
1161 : type loadInfo struct {
1162 : backingFileNum base.DiskFileNum
1163 : largestSeqNum uint64
1164 : smallestSeqNum uint64
1165 : }
1166 :
1167 1 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
1168 1 : // Try opening the file first.
1169 1 : var f objstorage.Readable
1170 1 : var err error
1171 1 : f, err = dbOpts.objProvider.OpenForReading(
1172 1 : context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
1173 1 : )
1174 1 : if err == nil {
1175 1 : cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
1176 1 : v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
1177 1 : }
1178 1 : if err == nil {
1179 1 : var objMeta objstorage.ObjectMetadata
1180 1 : objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, loadInfo.backingFileNum)
1181 1 : v.isShared = objMeta.IsShared()
1182 1 : }
1183 1 : if err != nil {
1184 1 : v.err = errors.Wrapf(
1185 1 : err, "pebble: backing file %s error", loadInfo.backingFileNum)
1186 1 : }
1187 1 : if v.err == nil && loadInfo.smallestSeqNum == loadInfo.largestSeqNum {
1188 1 : v.reader.Properties.GlobalSeqNum = loadInfo.largestSeqNum
1189 1 : }
1190 1 : if v.err != nil {
1191 1 : c.mu.Lock()
1192 1 : defer c.mu.Unlock()
1193 1 : // Lookup the node in the cache again as it might have already been
1194 1 : // removed.
1195 1 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
1196 1 : n := c.mu.nodes[key]
1197 1 : if n != nil && n.value == v {
1198 1 : c.releaseNode(n)
1199 1 : }
1200 : }
1201 1 : close(v.loaded)
1202 : }
1203 :
1204 1 : func (v *tableCacheValue) release(c *tableCacheShard) {
1205 1 : <-v.loaded
1206 1 : // Nothing to be done about an error at this point. Close the reader if it is
1207 1 : // open.
1208 1 : if v.reader != nil {
1209 1 : _ = v.reader.Close()
1210 1 : }
1211 1 : c.releasing.Done()
1212 : }
1213 :
1214 : type tableCacheNodeType int8
1215 :
1216 : const (
1217 : tableCacheNodeTest tableCacheNodeType = iota
1218 : tableCacheNodeCold
1219 : tableCacheNodeHot
1220 : )
1221 :
1222 0 : func (p tableCacheNodeType) String() string {
1223 0 : switch p {
1224 0 : case tableCacheNodeTest:
1225 0 : return "test"
1226 0 : case tableCacheNodeCold:
1227 0 : return "cold"
1228 0 : case tableCacheNodeHot:
1229 0 : return "hot"
1230 : }
1231 0 : return "unknown"
1232 : }
1233 :
1234 : type tableCacheNode struct {
1235 : fileNum base.DiskFileNum
1236 : value *tableCacheValue
1237 :
1238 : links struct {
1239 : next *tableCacheNode
1240 : prev *tableCacheNode
1241 : }
1242 : ptype tableCacheNodeType
1243 : // referenced is atomically set to indicate that this entry has been accessed
1244 : // since the last time one of the clock hands swept it.
1245 : referenced atomic.Bool
1246 :
1247 : // Storing the cache id associated with the DB instance here
1248 : // avoids the need to thread the dbOpts struct through many functions.
1249 : cacheID uint64
1250 : }
1251 :
1252 1 : func (n *tableCacheNode) next() *tableCacheNode {
1253 1 : if n == nil {
1254 0 : return nil
1255 0 : }
1256 1 : return n.links.next
1257 : }
1258 :
1259 1 : func (n *tableCacheNode) prev() *tableCacheNode {
1260 1 : if n == nil {
1261 0 : return nil
1262 0 : }
1263 1 : return n.links.prev
1264 : }
1265 :
1266 1 : func (n *tableCacheNode) link(s *tableCacheNode) {
1267 1 : s.links.prev = n.links.prev
1268 1 : s.links.prev.links.next = s
1269 1 : s.links.next = n
1270 1 : s.links.next.links.prev = s
1271 1 : }
1272 :
1273 1 : func (n *tableCacheNode) unlink() *tableCacheNode {
1274 1 : next := n.links.next
1275 1 : n.links.prev.links.next = n.links.next
1276 1 : n.links.next.links.prev = n.links.prev
1277 1 : n.links.prev = n
1278 1 : n.links.next = n
1279 1 : return next
1280 1 : }
1281 :
1282 : // iterSet holds a set of iterators of various key kinds, all constructed over
1283 : // the same data structure (eg, an sstable). A subset of the fields may be
1284 : // populated depending on the `iterKinds` passed to newIters.
1285 : type iterSet struct {
1286 : point internalIterator
1287 : rangeDeletion keyspan.FragmentIterator
1288 : rangeKey keyspan.FragmentIterator
1289 : }
1290 :
1291 : // TODO(jackson): Consider adding methods for fast paths that check whether an
1292 : // iterator of a particular kind is nil, so that these call sites don't need to
1293 : // reach into the struct's fields directly.
1294 :
1295 : // Point returns the contained point iterator. If there is no point iterator,
1296 : // Point returns a non-nil empty point iterator.
1297 1 : func (s *iterSet) Point() internalIterator {
1298 1 : if s.point == nil {
1299 0 : return emptyIter
1300 0 : }
1301 1 : return s.point
1302 : }
1303 :
1304 : // RangeDeletion returns the contained range deletion iterator. If there is no
1305 : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
1306 : // iterator.
1307 1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
1308 1 : if s.rangeDeletion == nil {
1309 1 : return emptyKeyspanIter
1310 1 : }
1311 1 : return s.rangeDeletion
1312 : }
1313 :
1314 : // RangeKey returns the contained range key iterator. If there is no range key
1315 : // iterator, RangeKey returns a non-nil empty keyspan iterator.
1316 1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
1317 1 : if s.rangeKey == nil {
1318 1 : return emptyKeyspanIter
1319 1 : }
1320 1 : return s.rangeKey
1321 : }
1322 :
1323 : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
1324 : // must be not be called on the consitutent iterators.
1325 1 : func (s *iterSet) CloseAll() error {
1326 1 : var err error
1327 1 : if s.point != nil {
1328 1 : err = s.point.Close()
1329 1 : }
1330 1 : if s.rangeDeletion != nil {
1331 1 : err = firstError(err, s.rangeDeletion.Close())
1332 1 : }
1333 1 : if s.rangeKey != nil {
1334 0 : err = firstError(err, s.rangeKey.Close())
1335 0 : }
1336 1 : return err
1337 : }
1338 :
1339 : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
1340 : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
1341 : // represent a set of desired iterator kinds.
1342 : type iterKinds uint8
1343 :
1344 1 : func (t iterKinds) Point() bool { return (t & iterPointKeys) != 0 }
1345 1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
1346 1 : func (t iterKinds) RangeKey() bool { return (t & iterRangeKeys) != 0 }
1347 :
1348 : const (
1349 : iterPointKeys iterKinds = 1 << iota
1350 : iterRangeDeletions
1351 : iterRangeKeys
1352 : )
|