Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/manifest"
23 : "github.com/cockroachdb/pebble/internal/private"
24 : "github.com/cockroachdb/pebble/objstorage"
25 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
26 : "github.com/cockroachdb/pebble/sstable"
27 : )
28 :
29 : var emptyIter = &errorIter{err: nil}
30 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
31 :
32 : // filteredAll is a singleton internalIterator implementation used when an
33 : // sstable does contain point keys, but all the keys are filtered by the active
34 : // PointKeyFilters set in the iterator's IterOptions.
35 : //
36 : // filteredAll implements filteredIter, ensuring the level iterator recognizes
37 : // when it may need to return file boundaries to keep the rangeDelIter open
38 : // during mergingIter operation.
39 : var filteredAll = &filteredAllKeysIter{errorIter: errorIter{err: nil}}
40 :
41 : var _ filteredIter = filteredAll
42 :
43 : type filteredAllKeysIter struct {
44 : errorIter
45 : }
46 :
47 1 : func (s *filteredAllKeysIter) MaybeFilteredKeys() bool {
48 1 : return true
49 1 : }
50 :
51 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
52 :
53 : // tableCacheOpts contains the db specific fields
54 : // of a table cache. This is stored in the tableCacheContainer
55 : // along with the table cache.
56 : // NB: It is important to make sure that the fields in this
57 : // struct are read-only. Since the fields here are shared
58 : // by every single tableCacheShard, if non read-only fields
59 : // are updated, we could have unnecessary evictions of those
60 : // fields, and the surrounding fields from the CPU caches.
61 : type tableCacheOpts struct {
62 : // iterCount keeps track of how many iterators are open. It is used to keep
63 : // track of leaked iterators on a per-db level.
64 : iterCount *atomic.Int32
65 :
66 : loggerAndTracer LoggerAndTracer
67 : cacheID uint64
68 : objProvider objstorage.Provider
69 : opts sstable.ReaderOptions
70 : filterMetrics *sstable.FilterMetricsTracker
71 : }
72 :
73 : // tableCacheContainer contains the table cache and
74 : // fields which are unique to the DB.
75 : type tableCacheContainer struct {
76 : tableCache *TableCache
77 :
78 : // dbOpts contains fields relevant to the table cache
79 : // which are unique to each DB.
80 : dbOpts tableCacheOpts
81 : }
82 :
83 : // newTableCacheContainer will panic if the underlying cache in the table cache
84 : // doesn't match Options.Cache.
85 : func newTableCacheContainer(
86 : tc *TableCache, cacheID uint64, objProvider objstorage.Provider, opts *Options, size int,
87 2 : ) *tableCacheContainer {
88 2 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
89 2 : if tc != nil {
90 1 : if tc.cache != opts.Cache {
91 0 : panic("pebble: underlying cache for the table cache and db are different")
92 : }
93 1 : tc.Ref()
94 2 : } else {
95 2 : // NewTableCache should create a ref to tc which the container should
96 2 : // drop whenever it is closed.
97 2 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
98 2 : }
99 :
100 2 : t := &tableCacheContainer{}
101 2 : t.tableCache = tc
102 2 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
103 2 : t.dbOpts.cacheID = cacheID
104 2 : t.dbOpts.objProvider = objProvider
105 2 : t.dbOpts.opts = opts.MakeReaderOptions()
106 2 : t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
107 2 : t.dbOpts.iterCount = new(atomic.Int32)
108 2 : return t
109 : }
110 :
111 : // Before calling close, make sure that there will be no further need
112 : // to access any of the files associated with the store.
113 2 : func (c *tableCacheContainer) close() error {
114 2 : // We want to do some cleanup work here. Check for leaked iterators
115 2 : // by the DB using this container. Note that we'll still perform cleanup
116 2 : // below in the case that there are leaked iterators.
117 2 : var err error
118 2 : if v := c.dbOpts.iterCount.Load(); v > 0 {
119 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
120 1 : }
121 :
122 : // Release nodes here.
123 2 : for _, shard := range c.tableCache.shards {
124 2 : if shard != nil {
125 2 : shard.removeDB(&c.dbOpts)
126 2 : }
127 : }
128 2 : return firstError(err, c.tableCache.Unref())
129 : }
130 :
131 : func (c *tableCacheContainer) newIters(
132 : ctx context.Context,
133 : file *manifest.FileMetadata,
134 : opts *IterOptions,
135 : internalOpts internalIterOpts,
136 2 : ) (internalIterator, keyspan.FragmentIterator, error) {
137 2 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts)
138 2 : }
139 :
140 : func (c *tableCacheContainer) newRangeKeyIter(
141 : file *manifest.FileMetadata, opts keyspan.SpanIterOptions,
142 2 : ) (keyspan.FragmentIterator, error) {
143 2 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newRangeKeyIter(file, opts, &c.dbOpts)
144 2 : }
145 :
146 : // getTableProperties returns the properties associated with the backing physical
147 : // table if the input metadata belongs to a virtual sstable.
148 1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
149 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
150 1 : }
151 :
152 2 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
153 2 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
154 2 : }
155 :
156 2 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
157 2 : var m CacheMetrics
158 2 : for i := range c.tableCache.shards {
159 2 : s := c.tableCache.shards[i]
160 2 : s.mu.RLock()
161 2 : m.Count += int64(len(s.mu.nodes))
162 2 : s.mu.RUnlock()
163 2 : m.Hits += s.hits.Load()
164 2 : m.Misses += s.misses.Load()
165 2 : }
166 2 : m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
167 2 : f := c.dbOpts.filterMetrics.Load()
168 2 : return m, f
169 : }
170 :
171 : func (c *tableCacheContainer) estimateSize(
172 : meta *fileMetadata, lower, upper []byte,
173 2 : ) (size uint64, err error) {
174 2 : if meta.Virtual {
175 1 : err = c.withVirtualReader(
176 1 : meta.VirtualMeta(),
177 1 : func(r sstable.VirtualReader) (err error) {
178 1 : size, err = r.EstimateDiskUsage(lower, upper)
179 1 : return err
180 1 : },
181 : )
182 2 : } else {
183 2 : err = c.withReader(
184 2 : meta.PhysicalMeta(),
185 2 : func(r *sstable.Reader) (err error) {
186 2 : size, err = r.EstimateDiskUsage(lower, upper)
187 2 : return err
188 2 : },
189 : )
190 : }
191 2 : if err != nil {
192 0 : return 0, err
193 0 : }
194 2 : return size, nil
195 : }
196 :
197 2 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
198 2 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
199 2 : var cr sstable.CommonReader = v.reader
200 2 : if file.Virtual {
201 2 : virtualReader := sstable.MakeVirtualReader(
202 2 : v.reader, file.VirtualMeta(),
203 2 : )
204 2 : cr = &virtualReader
205 2 : }
206 2 : return cr
207 : }
208 :
209 : func (c *tableCacheContainer) withCommonReader(
210 : meta *fileMetadata, fn func(sstable.CommonReader) error,
211 2 : ) error {
212 2 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
213 2 : v := s.findNode(meta, &c.dbOpts)
214 2 : defer s.unrefValue(v)
215 2 : if v.err != nil {
216 0 : return v.err
217 0 : }
218 2 : return fn(createCommonReader(v, meta))
219 : }
220 :
221 2 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
222 2 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
223 2 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
224 2 : defer s.unrefValue(v)
225 2 : if v.err != nil {
226 1 : return v.err
227 1 : }
228 2 : return fn(v.reader)
229 : }
230 :
231 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
232 : func (c *tableCacheContainer) withVirtualReader(
233 : meta virtualMeta, fn func(sstable.VirtualReader) error,
234 2 : ) error {
235 2 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
236 2 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
237 2 : defer s.unrefValue(v)
238 2 : if v.err != nil {
239 0 : return v.err
240 0 : }
241 2 : return fn(sstable.MakeVirtualReader(v.reader, meta))
242 : }
243 :
244 2 : func (c *tableCacheContainer) iterCount() int64 {
245 2 : return int64(c.dbOpts.iterCount.Load())
246 2 : }
247 :
248 : // TableCache is a shareable cache for open sstables.
249 : type TableCache struct {
250 : refs atomic.Int64
251 :
252 : cache *Cache
253 : shards []*tableCacheShard
254 : }
255 :
256 : // Ref adds a reference to the table cache. Once tableCache.init returns,
257 : // the table cache only remains valid if there is at least one reference
258 : // to it.
259 1 : func (c *TableCache) Ref() {
260 1 : v := c.refs.Add(1)
261 1 : // We don't want the reference count to ever go from 0 -> 1,
262 1 : // cause a reference count of 0 implies that we've closed the cache.
263 1 : if v <= 1 {
264 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
265 : }
266 : }
267 :
268 : // Unref removes a reference to the table cache.
269 2 : func (c *TableCache) Unref() error {
270 2 : v := c.refs.Add(-1)
271 2 : switch {
272 1 : case v < 0:
273 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
274 2 : case v == 0:
275 2 : var err error
276 2 : for i := range c.shards {
277 2 : // The cache shard is not allocated yet, nothing to close
278 2 : if c.shards[i] == nil {
279 0 : continue
280 : }
281 2 : err = firstError(err, c.shards[i].Close())
282 : }
283 :
284 : // Unref the cache which we create a reference to when the tableCache
285 : // is first instantiated.
286 2 : c.cache.Unref()
287 2 : return err
288 : }
289 1 : return nil
290 : }
291 :
292 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
293 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
294 2 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
295 2 : if size == 0 {
296 0 : panic("pebble: cannot create a table cache of size 0")
297 2 : } else if numShards == 0 {
298 0 : panic("pebble: cannot create a table cache with 0 shards")
299 : }
300 :
301 2 : c := &TableCache{}
302 2 : c.cache = cache
303 2 : c.cache.Ref()
304 2 :
305 2 : c.shards = make([]*tableCacheShard, numShards)
306 2 : for i := range c.shards {
307 2 : c.shards[i] = &tableCacheShard{}
308 2 : c.shards[i].init(size / len(c.shards))
309 2 : }
310 :
311 : // Hold a ref to the cache here.
312 2 : c.refs.Store(1)
313 2 :
314 2 : return c
315 : }
316 :
317 2 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
318 2 : return c.shards[uint64(fileNum.FileNum())%uint64(len(c.shards))]
319 2 : }
320 :
321 : type tableCacheKey struct {
322 : cacheID uint64
323 : fileNum base.DiskFileNum
324 : }
325 :
326 : type tableCacheShard struct {
327 : hits atomic.Int64
328 : misses atomic.Int64
329 : iterCount atomic.Int32
330 :
331 : size int
332 :
333 : mu struct {
334 : sync.RWMutex
335 : nodes map[tableCacheKey]*tableCacheNode
336 : // The iters map is only created and populated in race builds.
337 : iters map[io.Closer][]byte
338 :
339 : handHot *tableCacheNode
340 : handCold *tableCacheNode
341 : handTest *tableCacheNode
342 :
343 : coldTarget int
344 : sizeHot int
345 : sizeCold int
346 : sizeTest int
347 : }
348 : releasing sync.WaitGroup
349 : releasingCh chan *tableCacheValue
350 : releaseLoopExit sync.WaitGroup
351 : }
352 :
353 2 : func (c *tableCacheShard) init(size int) {
354 2 : c.size = size
355 2 :
356 2 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
357 2 : c.mu.coldTarget = size
358 2 : c.releasingCh = make(chan *tableCacheValue, 100)
359 2 : c.releaseLoopExit.Add(1)
360 2 : go c.releaseLoop()
361 2 :
362 2 : if invariants.RaceEnabled {
363 0 : c.mu.iters = make(map[io.Closer][]byte)
364 0 : }
365 : }
366 :
367 2 : func (c *tableCacheShard) releaseLoop() {
368 2 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
369 2 : defer c.releaseLoopExit.Done()
370 2 : for v := range c.releasingCh {
371 2 : v.release(c)
372 2 : }
373 : })
374 : }
375 :
376 : // checkAndIntersectFilters checks the specific table and block property filters
377 : // for intersection with any available table and block-level properties. Returns
378 : // true for ok if this table should be read by this iterator.
379 : func (c *tableCacheShard) checkAndIntersectFilters(
380 : v *tableCacheValue,
381 : tableFilter func(userProps map[string]string) bool,
382 : blockPropertyFilters []BlockPropertyFilter,
383 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
384 2 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
385 2 : if tableFilter != nil &&
386 2 : !tableFilter(v.reader.Properties.UserProperties) {
387 1 : return false, nil, nil
388 1 : }
389 :
390 2 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
391 2 : filterer, err = sstable.IntersectsTable(
392 2 : blockPropertyFilters,
393 2 : boundLimitedFilter,
394 2 : v.reader.Properties.UserProperties,
395 2 : )
396 2 : // NB: IntersectsTable will return a nil filterer if the table-level
397 2 : // properties indicate there's no intersection with the provided filters.
398 2 : if filterer == nil || err != nil {
399 2 : return false, nil, err
400 2 : }
401 : }
402 2 : return true, filterer, nil
403 : }
404 :
405 : func (c *tableCacheShard) newIters(
406 : ctx context.Context,
407 : file *manifest.FileMetadata,
408 : opts *IterOptions,
409 : internalOpts internalIterOpts,
410 : dbOpts *tableCacheOpts,
411 2 : ) (internalIterator, keyspan.FragmentIterator, error) {
412 2 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
413 2 : // since parts of the sstable are read during the construction. The Reader
414 2 : // should not remember that context since the Reader can be long-lived.
415 2 :
416 2 : // Calling findNode gives us the responsibility of decrementing v's
417 2 : // refCount. If opening the underlying table resulted in error, then we
418 2 : // decrement this straight away. Otherwise, we pass that responsibility to
419 2 : // the sstable iterator, which decrements when it is closed.
420 2 : v := c.findNode(file, dbOpts)
421 2 : if v.err != nil {
422 1 : defer c.unrefValue(v)
423 1 : return nil, nil, v.err
424 1 : }
425 :
426 2 : hideObsoletePoints := false
427 2 : var pointKeyFilters []BlockPropertyFilter
428 2 : if opts != nil {
429 2 : // This code is appending (at most one filter) in-place to
430 2 : // opts.PointKeyFilters even though the slice is shared for iterators in
431 2 : // the same iterator tree. This is acceptable since all the following
432 2 : // properties are true:
433 2 : // - The iterator tree is single threaded, so the shared backing for the
434 2 : // slice is being mutated in a single threaded manner.
435 2 : // - Each shallow copy of the slice has its own notion of length.
436 2 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
437 2 : // struct, which is stateless, so overwriting that struct when creating
438 2 : // one sstable iterator is harmless to other sstable iterators that are
439 2 : // relying on that struct.
440 2 : //
441 2 : // An alternative would be to have different slices for different sstable
442 2 : // iterators, but that requires more work to avoid allocations.
443 2 : hideObsoletePoints, pointKeyFilters =
444 2 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
445 2 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
446 2 : }
447 2 : ok := true
448 2 : var filterer *sstable.BlockPropertiesFilterer
449 2 : var err error
450 2 : if opts != nil {
451 2 : ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
452 2 : pointKeyFilters, internalOpts.boundLimitedFilter)
453 2 : }
454 2 : if err != nil {
455 0 : c.unrefValue(v)
456 0 : return nil, nil, err
457 0 : }
458 :
459 : // Note: This suffers an allocation for virtual sstables.
460 2 : cr := createCommonReader(v, file)
461 2 :
462 2 : provider := dbOpts.objProvider
463 2 : // Check if this file is a foreign file.
464 2 : objMeta, err := provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
465 2 : if err != nil {
466 0 : return nil, nil, err
467 0 : }
468 :
469 : // NB: range-del iterator does not maintain a reference to the table, nor
470 : // does it need to read from it after creation.
471 2 : rangeDelIter, err := cr.NewRawRangeDelIter()
472 2 : if err != nil {
473 1 : c.unrefValue(v)
474 1 : return nil, nil, err
475 1 : }
476 :
477 2 : if !ok {
478 2 : c.unrefValue(v)
479 2 : // Return an empty iterator. This iterator has no mutable state, so
480 2 : // using a singleton is fine.
481 2 : // NB: We still return the potentially non-empty rangeDelIter. This
482 2 : // ensures the iterator observes the file's range deletions even if the
483 2 : // block property filters exclude all the file's point keys. The range
484 2 : // deletions may still delete keys lower in the LSM in files that DO
485 2 : // match the active filters.
486 2 : //
487 2 : // The point iterator returned must implement the filteredIter
488 2 : // interface, so that the level iterator surfaces file boundaries when
489 2 : // range deletions are present.
490 2 : return filteredAll, rangeDelIter, err
491 2 : }
492 :
493 2 : var iter sstable.Iterator
494 2 : useFilter := true
495 2 : if opts != nil {
496 2 : useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
497 2 : ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
498 2 : }
499 2 : tableFormat, err := v.reader.TableFormat()
500 2 : if err != nil {
501 0 : return nil, nil, err
502 0 : }
503 2 : var rp sstable.ReaderProvider
504 2 : if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
505 2 : rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
506 2 : }
507 :
508 2 : if provider.IsSharedForeign(objMeta) {
509 1 : if tableFormat < sstable.TableFormatPebblev4 {
510 0 : return nil, nil, errors.New("pebble: shared foreign sstable has a lower table format than expected")
511 0 : }
512 1 : hideObsoletePoints = true
513 : }
514 2 : if internalOpts.bytesIterated != nil {
515 2 : iter, err = cr.NewCompactionIter(internalOpts.bytesIterated, rp, internalOpts.bufferPool)
516 2 : } else {
517 2 : iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
518 2 : ctx, opts.GetLowerBound(), opts.GetUpperBound(), filterer, hideObsoletePoints, useFilter,
519 2 : internalOpts.stats, rp)
520 2 : }
521 2 : if err != nil {
522 1 : if rangeDelIter != nil {
523 1 : _ = rangeDelIter.Close()
524 1 : }
525 1 : c.unrefValue(v)
526 1 : return nil, nil, err
527 : }
528 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
529 : // care to avoid introducing an allocation here by adding a closure.
530 2 : iter.SetCloseHook(v.closeHook)
531 2 :
532 2 : c.iterCount.Add(1)
533 2 : dbOpts.iterCount.Add(1)
534 2 : if invariants.RaceEnabled {
535 0 : c.mu.Lock()
536 0 : c.mu.iters[iter] = debug.Stack()
537 0 : c.mu.Unlock()
538 0 : }
539 2 : return iter, rangeDelIter, nil
540 : }
541 :
542 : func (c *tableCacheShard) newRangeKeyIter(
543 : file *manifest.FileMetadata, opts keyspan.SpanIterOptions, dbOpts *tableCacheOpts,
544 2 : ) (keyspan.FragmentIterator, error) {
545 2 : // Calling findNode gives us the responsibility of decrementing v's
546 2 : // refCount. If opening the underlying table resulted in error, then we
547 2 : // decrement this straight away. Otherwise, we pass that responsibility to
548 2 : // the sstable iterator, which decrements when it is closed.
549 2 : v := c.findNode(file, dbOpts)
550 2 : if v.err != nil {
551 0 : defer c.unrefValue(v)
552 0 : return nil, v.err
553 0 : }
554 :
555 2 : ok := true
556 2 : var err error
557 2 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
558 2 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
559 2 : // file's range key blocks may surface deleted range keys below. This is
560 2 : // done here, rather than deferring to the block-property collector in order
561 2 : // to maintain parity with point keys and the treatment of RANGEDELs.
562 2 : if v.reader.Properties.NumRangeKeyDels == 0 {
563 2 : ok, _, err = c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil)
564 2 : }
565 2 : if err != nil {
566 0 : c.unrefValue(v)
567 0 : return nil, err
568 0 : }
569 2 : if !ok {
570 0 : c.unrefValue(v)
571 0 : // Return the empty iterator. This iterator has no mutable state, so
572 0 : // using a singleton is fine.
573 0 : return emptyKeyspanIter, err
574 0 : }
575 :
576 2 : var iter keyspan.FragmentIterator
577 2 : if file.Virtual {
578 2 : virtualReader := sstable.MakeVirtualReader(
579 2 : v.reader, file.VirtualMeta(),
580 2 : )
581 2 : iter, err = virtualReader.NewRawRangeKeyIter()
582 2 : } else {
583 2 : iter, err = v.reader.NewRawRangeKeyIter()
584 2 : }
585 :
586 : // iter is a block iter that holds the entire value of the block in memory.
587 : // No need to hold onto a ref of the cache value.
588 2 : c.unrefValue(v)
589 2 :
590 2 : if err != nil {
591 1 : return nil, err
592 1 : }
593 :
594 2 : if iter == nil {
595 1 : // NewRawRangeKeyIter can return nil even if there's no error. However,
596 1 : // the keyspan.LevelIter expects a non-nil iterator if err is nil.
597 1 : return emptyKeyspanIter, nil
598 1 : }
599 :
600 2 : return iter, nil
601 : }
602 :
603 : type tableCacheShardReaderProvider struct {
604 : c *tableCacheShard
605 : file *manifest.FileMetadata
606 : dbOpts *tableCacheOpts
607 : v *tableCacheValue
608 : }
609 :
610 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
611 :
612 : // GetReader implements sstable.ReaderProvider. Note that it is not the
613 : // responsibility of tableCacheShardReaderProvider to ensure that the file
614 : // continues to exist. The ReaderProvider is used in iterators where the
615 : // top-level iterator is pinning the read state and preventing the files from
616 : // being deleted.
617 : //
618 : // The caller must call tableCacheShardReaderProvider.Close.
619 : //
620 : // Note that currently the Reader returned here is only used to read value
621 : // blocks. This reader shouldn't be used for other purposes like reading keys
622 : // outside of virtual sstable bounds.
623 : //
624 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
625 : // that the reader isn't used for other purposes.
626 1 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
627 1 : // Calling findNode gives us the responsibility of decrementing v's
628 1 : // refCount.
629 1 : v := rp.c.findNode(rp.file, rp.dbOpts)
630 1 : if v.err != nil {
631 0 : defer rp.c.unrefValue(v)
632 0 : return nil, v.err
633 0 : }
634 1 : rp.v = v
635 1 : return v.reader, nil
636 : }
637 :
638 : // Close implements sstable.ReaderProvider.
639 1 : func (rp *tableCacheShardReaderProvider) Close() {
640 1 : rp.c.unrefValue(rp.v)
641 1 : rp.v = nil
642 1 : }
643 :
644 : // getTableProperties return sst table properties for target file
645 : func (c *tableCacheShard) getTableProperties(
646 : file *fileMetadata, dbOpts *tableCacheOpts,
647 1 : ) (*sstable.Properties, error) {
648 1 : // Calling findNode gives us the responsibility of decrementing v's refCount here
649 1 : v := c.findNode(file, dbOpts)
650 1 : defer c.unrefValue(v)
651 1 :
652 1 : if v.err != nil {
653 0 : return nil, v.err
654 0 : }
655 1 : return &v.reader.Properties, nil
656 : }
657 :
658 : // releaseNode releases a node from the tableCacheShard.
659 : //
660 : // c.mu must be held when calling this.
661 1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
662 1 : c.unlinkNode(n)
663 1 : c.clearNode(n)
664 1 : }
665 :
666 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
667 : // reference in place.
668 : //
669 : // c.mu must be held when calling this.
670 2 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
671 2 : key := tableCacheKey{n.cacheID, n.fileNum}
672 2 : delete(c.mu.nodes, key)
673 2 :
674 2 : switch n.ptype {
675 2 : case tableCacheNodeHot:
676 2 : c.mu.sizeHot--
677 2 : case tableCacheNodeCold:
678 2 : c.mu.sizeCold--
679 2 : case tableCacheNodeTest:
680 2 : c.mu.sizeTest--
681 : }
682 :
683 2 : if n == c.mu.handHot {
684 2 : c.mu.handHot = c.mu.handHot.prev()
685 2 : }
686 2 : if n == c.mu.handCold {
687 2 : c.mu.handCold = c.mu.handCold.prev()
688 2 : }
689 2 : if n == c.mu.handTest {
690 2 : c.mu.handTest = c.mu.handTest.prev()
691 2 : }
692 :
693 2 : if n.unlink() == n {
694 2 : // This was the last entry in the cache.
695 2 : c.mu.handHot = nil
696 2 : c.mu.handCold = nil
697 2 : c.mu.handTest = nil
698 2 : }
699 :
700 2 : n.links.prev = nil
701 2 : n.links.next = nil
702 : }
703 :
704 2 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
705 2 : if v := n.value; v != nil {
706 2 : n.value = nil
707 2 : c.unrefValue(v)
708 2 : }
709 : }
710 :
711 : // unrefValue decrements the reference count for the specified value, releasing
712 : // it if the reference count fell to 0. Note that the value has a reference if
713 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
714 : // the node has already been removed from that map.
715 2 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
716 2 : if v.refCount.Add(-1) == 0 {
717 2 : c.releasing.Add(1)
718 2 : c.releasingCh <- v
719 2 : }
720 : }
721 :
722 : // findNode returns the node for the table with the given file number, creating
723 : // that node if it didn't already exist. The caller is responsible for
724 : // decrementing the returned node's refCount.
725 : func (c *tableCacheShard) findNode(
726 : meta *fileMetadata, dbOpts *tableCacheOpts,
727 2 : ) (v *tableCacheValue) {
728 2 : // Loading a file before its global sequence number is known (eg,
729 2 : // during ingest before entering the commit pipeline) can pollute
730 2 : // the cache with incorrect state. In invariant builds, verify
731 2 : // that the global sequence number of the returned reader matches.
732 2 : if invariants.Enabled {
733 2 : defer func() {
734 2 : if v.reader != nil && meta.LargestSeqNum == meta.SmallestSeqNum &&
735 2 : v.reader.Properties.GlobalSeqNum != meta.SmallestSeqNum {
736 0 : panic(errors.AssertionFailedf("file %s loaded from table cache with the wrong global sequence number %d",
737 0 : meta, v.reader.Properties.GlobalSeqNum))
738 : }
739 : }()
740 : }
741 2 : if refs := meta.Refs(); refs <= 0 {
742 0 : panic(errors.AssertionFailedf("attempting to load file %s with refs=%d from table cache",
743 0 : meta, refs))
744 : }
745 :
746 : // Fast-path for a hit in the cache.
747 2 : c.mu.RLock()
748 2 : key := tableCacheKey{dbOpts.cacheID, meta.FileBacking.DiskFileNum}
749 2 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
750 2 : // Fast-path hit.
751 2 : //
752 2 : // The caller is responsible for decrementing the refCount.
753 2 : v = n.value
754 2 : v.refCount.Add(1)
755 2 : c.mu.RUnlock()
756 2 : n.referenced.Store(true)
757 2 : c.hits.Add(1)
758 2 : <-v.loaded
759 2 : return v
760 2 : }
761 2 : c.mu.RUnlock()
762 2 :
763 2 : c.mu.Lock()
764 2 :
765 2 : n := c.mu.nodes[key]
766 2 : switch {
767 2 : case n == nil:
768 2 : // Slow-path miss of a non-existent node.
769 2 : n = &tableCacheNode{
770 2 : fileNum: meta.FileBacking.DiskFileNum,
771 2 : ptype: tableCacheNodeCold,
772 2 : }
773 2 : c.addNode(n, dbOpts)
774 2 : c.mu.sizeCold++
775 :
776 2 : case n.value != nil:
777 2 : // Slow-path hit of a hot or cold node.
778 2 : //
779 2 : // The caller is responsible for decrementing the refCount.
780 2 : v = n.value
781 2 : v.refCount.Add(1)
782 2 : n.referenced.Store(true)
783 2 : c.hits.Add(1)
784 2 : c.mu.Unlock()
785 2 : <-v.loaded
786 2 : return v
787 :
788 2 : default:
789 2 : // Slow-path miss of a test node.
790 2 : c.unlinkNode(n)
791 2 : c.mu.coldTarget++
792 2 : if c.mu.coldTarget > c.size {
793 1 : c.mu.coldTarget = c.size
794 1 : }
795 :
796 2 : n.referenced.Store(false)
797 2 : n.ptype = tableCacheNodeHot
798 2 : c.addNode(n, dbOpts)
799 2 : c.mu.sizeHot++
800 : }
801 :
802 2 : c.misses.Add(1)
803 2 :
804 2 : v = &tableCacheValue{
805 2 : loaded: make(chan struct{}),
806 2 : }
807 2 : v.refCount.Store(2)
808 2 : // Cache the closure invoked when an iterator is closed. This avoids an
809 2 : // allocation on every call to newIters.
810 2 : v.closeHook = func(i sstable.Iterator) error {
811 2 : if invariants.RaceEnabled {
812 0 : c.mu.Lock()
813 0 : delete(c.mu.iters, i)
814 0 : c.mu.Unlock()
815 0 : }
816 2 : c.unrefValue(v)
817 2 : c.iterCount.Add(-1)
818 2 : dbOpts.iterCount.Add(-1)
819 2 : return nil
820 : }
821 2 : n.value = v
822 2 :
823 2 : c.mu.Unlock()
824 2 :
825 2 : // Note adding to the cache lists must complete before we begin loading the
826 2 : // table as a failure during load will result in the node being unlinked.
827 2 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
828 2 : v.load(
829 2 : loadInfo{
830 2 : backingFileNum: meta.FileBacking.DiskFileNum,
831 2 : smallestSeqNum: meta.SmallestSeqNum,
832 2 : largestSeqNum: meta.LargestSeqNum,
833 2 : }, c, dbOpts)
834 2 : })
835 2 : return v
836 : }
837 :
838 2 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
839 2 : c.evictNodes()
840 2 : n.cacheID = dbOpts.cacheID
841 2 : key := tableCacheKey{n.cacheID, n.fileNum}
842 2 : c.mu.nodes[key] = n
843 2 :
844 2 : n.links.next = n
845 2 : n.links.prev = n
846 2 : if c.mu.handHot == nil {
847 2 : // First element.
848 2 : c.mu.handHot = n
849 2 : c.mu.handCold = n
850 2 : c.mu.handTest = n
851 2 : } else {
852 2 : c.mu.handHot.link(n)
853 2 : }
854 :
855 2 : if c.mu.handCold == c.mu.handHot {
856 2 : c.mu.handCold = c.mu.handCold.prev()
857 2 : }
858 : }
859 :
860 2 : func (c *tableCacheShard) evictNodes() {
861 2 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
862 2 : c.runHandCold()
863 2 : }
864 : }
865 :
866 2 : func (c *tableCacheShard) runHandCold() {
867 2 : n := c.mu.handCold
868 2 : if n.ptype == tableCacheNodeCold {
869 2 : if n.referenced.Load() {
870 2 : n.referenced.Store(false)
871 2 : n.ptype = tableCacheNodeHot
872 2 : c.mu.sizeCold--
873 2 : c.mu.sizeHot++
874 2 : } else {
875 2 : c.clearNode(n)
876 2 : n.ptype = tableCacheNodeTest
877 2 : c.mu.sizeCold--
878 2 : c.mu.sizeTest++
879 2 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
880 1 : c.runHandTest()
881 1 : }
882 : }
883 : }
884 :
885 2 : c.mu.handCold = c.mu.handCold.next()
886 2 :
887 2 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
888 2 : c.runHandHot()
889 2 : }
890 : }
891 :
892 2 : func (c *tableCacheShard) runHandHot() {
893 2 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
894 2 : c.runHandTest()
895 2 : if c.mu.handHot == nil {
896 0 : return
897 0 : }
898 : }
899 :
900 2 : n := c.mu.handHot
901 2 : if n.ptype == tableCacheNodeHot {
902 2 : if n.referenced.Load() {
903 1 : n.referenced.Store(false)
904 2 : } else {
905 2 : n.ptype = tableCacheNodeCold
906 2 : c.mu.sizeHot--
907 2 : c.mu.sizeCold++
908 2 : }
909 : }
910 :
911 2 : c.mu.handHot = c.mu.handHot.next()
912 : }
913 :
914 2 : func (c *tableCacheShard) runHandTest() {
915 2 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
916 2 : c.runHandCold()
917 2 : if c.mu.handTest == nil {
918 0 : return
919 0 : }
920 : }
921 :
922 2 : n := c.mu.handTest
923 2 : if n.ptype == tableCacheNodeTest {
924 2 : c.mu.coldTarget--
925 2 : if c.mu.coldTarget < 0 {
926 1 : c.mu.coldTarget = 0
927 1 : }
928 2 : c.unlinkNode(n)
929 2 : c.clearNode(n)
930 : }
931 :
932 2 : c.mu.handTest = c.mu.handTest.next()
933 : }
934 :
935 2 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
936 2 : c.mu.Lock()
937 2 : key := tableCacheKey{dbOpts.cacheID, fileNum}
938 2 : n := c.mu.nodes[key]
939 2 : var v *tableCacheValue
940 2 : if n != nil {
941 2 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
942 2 : // the tableCacheNode.release() call synchronously below to ensure the
943 2 : // sstable file descriptor is closed before returning. Note that
944 2 : // tableCacheShard.releasing needs to be incremented while holding
945 2 : // tableCacheShard.mu in order to avoid a race with Close()
946 2 : c.unlinkNode(n)
947 2 : v = n.value
948 2 : if v != nil {
949 2 : if !allowLeak {
950 2 : if t := v.refCount.Add(-1); t != 0 {
951 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
952 0 : }
953 : }
954 2 : c.releasing.Add(1)
955 : }
956 : }
957 :
958 2 : c.mu.Unlock()
959 2 :
960 2 : if v != nil {
961 2 : v.release(c)
962 2 : }
963 :
964 2 : dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
965 : }
966 :
967 : // removeDB evicts any nodes which have a reference to the DB
968 : // associated with dbOpts.cacheID. Make sure that there will
969 : // be no more accesses to the files associated with the DB.
970 2 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
971 2 : var fileNums []base.DiskFileNum
972 2 :
973 2 : c.mu.RLock()
974 2 : // Collect the fileNums which need to be cleaned.
975 2 : var firstNode *tableCacheNode
976 2 : node := c.mu.handHot
977 2 : for node != firstNode {
978 2 : if firstNode == nil {
979 2 : firstNode = node
980 2 : }
981 :
982 2 : if node.cacheID == dbOpts.cacheID {
983 2 : fileNums = append(fileNums, node.fileNum)
984 2 : }
985 2 : node = node.next()
986 : }
987 2 : c.mu.RUnlock()
988 2 :
989 2 : // Evict all the nodes associated with the DB.
990 2 : // This should synchronously close all the files
991 2 : // associated with the DB.
992 2 : for _, fileNum := range fileNums {
993 2 : c.evict(fileNum, dbOpts, true)
994 2 : }
995 : }
996 :
997 2 : func (c *tableCacheShard) Close() error {
998 2 : c.mu.Lock()
999 2 : defer c.mu.Unlock()
1000 2 :
1001 2 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1002 2 : // the case that there are leaked iterators.
1003 2 : var err error
1004 2 : if v := c.iterCount.Load(); v > 0 {
1005 1 : if !invariants.RaceEnabled {
1006 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1007 1 : } else {
1008 0 : var buf bytes.Buffer
1009 0 : for _, stack := range c.mu.iters {
1010 0 : fmt.Fprintf(&buf, "%s\n", stack)
1011 0 : }
1012 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1013 : }
1014 : }
1015 :
1016 2 : for c.mu.handHot != nil {
1017 0 : n := c.mu.handHot
1018 0 : if n.value != nil {
1019 0 : if n.value.refCount.Add(-1) == 0 {
1020 0 : c.releasing.Add(1)
1021 0 : c.releasingCh <- n.value
1022 0 : }
1023 : }
1024 0 : c.unlinkNode(n)
1025 : }
1026 2 : c.mu.nodes = nil
1027 2 : c.mu.handHot = nil
1028 2 : c.mu.handCold = nil
1029 2 : c.mu.handTest = nil
1030 2 :
1031 2 : // Only shutdown the releasing goroutine if there were no leaked
1032 2 : // iterators. If there were leaked iterators, we leave the goroutine running
1033 2 : // and the releasingCh open so that a subsequent iterator close can
1034 2 : // complete. This behavior is used by iterator leak tests. Leaking the
1035 2 : // goroutine for these tests is less bad not closing the iterator which
1036 2 : // triggers other warnings about block cache handles not being released.
1037 2 : if err != nil {
1038 1 : c.releasing.Wait()
1039 1 : return err
1040 1 : }
1041 :
1042 2 : close(c.releasingCh)
1043 2 : c.releasing.Wait()
1044 2 : c.releaseLoopExit.Wait()
1045 2 : return err
1046 : }
1047 :
1048 : type tableCacheValue struct {
1049 : closeHook func(i sstable.Iterator) error
1050 : reader *sstable.Reader
1051 : err error
1052 : loaded chan struct{}
1053 : // Reference count for the value. The reader is closed when the reference
1054 : // count drops to zero.
1055 : refCount atomic.Int32
1056 : }
1057 :
1058 : type loadInfo struct {
1059 : backingFileNum base.DiskFileNum
1060 : largestSeqNum uint64
1061 : smallestSeqNum uint64
1062 : }
1063 :
1064 2 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
1065 2 : // Try opening the file first.
1066 2 : var f objstorage.Readable
1067 2 : var err error
1068 2 : f, err = dbOpts.objProvider.OpenForReading(
1069 2 : context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
1070 2 : )
1071 2 : if err == nil {
1072 2 : cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
1073 2 : v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
1074 2 : }
1075 2 : if err != nil {
1076 1 : v.err = errors.Wrapf(
1077 1 : err, "pebble: backing file %s error", errors.Safe(loadInfo.backingFileNum.FileNum()))
1078 1 : }
1079 2 : if v.err == nil && loadInfo.smallestSeqNum == loadInfo.largestSeqNum {
1080 2 : v.reader.Properties.GlobalSeqNum = loadInfo.largestSeqNum
1081 2 : }
1082 2 : if v.err != nil {
1083 1 : c.mu.Lock()
1084 1 : defer c.mu.Unlock()
1085 1 : // Lookup the node in the cache again as it might have already been
1086 1 : // removed.
1087 1 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
1088 1 : n := c.mu.nodes[key]
1089 1 : if n != nil && n.value == v {
1090 1 : c.releaseNode(n)
1091 1 : }
1092 : }
1093 2 : close(v.loaded)
1094 : }
1095 :
1096 2 : func (v *tableCacheValue) release(c *tableCacheShard) {
1097 2 : <-v.loaded
1098 2 : // Nothing to be done about an error at this point. Close the reader if it is
1099 2 : // open.
1100 2 : if v.reader != nil {
1101 2 : _ = v.reader.Close()
1102 2 : }
1103 2 : c.releasing.Done()
1104 : }
1105 :
1106 : type tableCacheNodeType int8
1107 :
1108 : const (
1109 : tableCacheNodeTest tableCacheNodeType = iota
1110 : tableCacheNodeCold
1111 : tableCacheNodeHot
1112 : )
1113 :
1114 0 : func (p tableCacheNodeType) String() string {
1115 0 : switch p {
1116 0 : case tableCacheNodeTest:
1117 0 : return "test"
1118 0 : case tableCacheNodeCold:
1119 0 : return "cold"
1120 0 : case tableCacheNodeHot:
1121 0 : return "hot"
1122 : }
1123 0 : return "unknown"
1124 : }
1125 :
1126 : type tableCacheNode struct {
1127 : fileNum base.DiskFileNum
1128 : value *tableCacheValue
1129 :
1130 : links struct {
1131 : next *tableCacheNode
1132 : prev *tableCacheNode
1133 : }
1134 : ptype tableCacheNodeType
1135 : // referenced is atomically set to indicate that this entry has been accessed
1136 : // since the last time one of the clock hands swept it.
1137 : referenced atomic.Bool
1138 :
1139 : // Storing the cache id associated with the DB instance here
1140 : // avoids the need to thread the dbOpts struct through many functions.
1141 : cacheID uint64
1142 : }
1143 :
1144 2 : func (n *tableCacheNode) next() *tableCacheNode {
1145 2 : if n == nil {
1146 0 : return nil
1147 0 : }
1148 2 : return n.links.next
1149 : }
1150 :
1151 2 : func (n *tableCacheNode) prev() *tableCacheNode {
1152 2 : if n == nil {
1153 0 : return nil
1154 0 : }
1155 2 : return n.links.prev
1156 : }
1157 :
1158 2 : func (n *tableCacheNode) link(s *tableCacheNode) {
1159 2 : s.links.prev = n.links.prev
1160 2 : s.links.prev.links.next = s
1161 2 : s.links.next = n
1162 2 : s.links.next.links.prev = s
1163 2 : }
1164 :
1165 2 : func (n *tableCacheNode) unlink() *tableCacheNode {
1166 2 : next := n.links.next
1167 2 : n.links.prev.links.next = n.links.next
1168 2 : n.links.next.links.prev = n.links.prev
1169 2 : n.links.prev = n
1170 2 : n.links.next = n
1171 2 : return next
1172 2 : }
|