Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/manifest"
23 : "github.com/cockroachdb/pebble/internal/private"
24 : "github.com/cockroachdb/pebble/objstorage"
25 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
26 : "github.com/cockroachdb/pebble/sstable"
27 : )
28 :
29 : var emptyIter = &errorIter{err: nil}
30 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
31 :
32 : // filteredAll is a singleton internalIterator implementation used when an
33 : // sstable does contain point keys, but all the keys are filtered by the active
34 : // PointKeyFilters set in the iterator's IterOptions.
35 : //
36 : // filteredAll implements filteredIter, ensuring the level iterator recognizes
37 : // when it may need to return file boundaries to keep the rangeDelIter open
38 : // during mergingIter operation.
39 : var filteredAll = &filteredAllKeysIter{errorIter: errorIter{err: nil}}
40 :
41 : var _ filteredIter = filteredAll
42 :
43 : type filteredAllKeysIter struct {
44 : errorIter
45 : }
46 :
47 1 : func (s *filteredAllKeysIter) MaybeFilteredKeys() bool {
48 1 : return true
49 1 : }
50 :
51 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
52 :
53 : // tableCacheOpts contains the db specific fields
54 : // of a table cache. This is stored in the tableCacheContainer
55 : // along with the table cache.
56 : // NB: It is important to make sure that the fields in this
57 : // struct are read-only. Since the fields here are shared
58 : // by every single tableCacheShard, if non read-only fields
59 : // are updated, we could have unnecessary evictions of those
60 : // fields, and the surrounding fields from the CPU caches.
61 : type tableCacheOpts struct {
62 : // iterCount keeps track of how many iterators are open. It is used to keep
63 : // track of leaked iterators on a per-db level.
64 : iterCount *atomic.Int32
65 :
66 : loggerAndTracer LoggerAndTracer
67 : cacheID uint64
68 : objProvider objstorage.Provider
69 : opts sstable.ReaderOptions
70 : filterMetrics *sstable.FilterMetricsTracker
71 : }
72 :
73 : // tableCacheContainer contains the table cache and
74 : // fields which are unique to the DB.
75 : type tableCacheContainer struct {
76 : tableCache *TableCache
77 :
78 : // dbOpts contains fields relevant to the table cache
79 : // which are unique to each DB.
80 : dbOpts tableCacheOpts
81 : }
82 :
83 : // newTableCacheContainer will panic if the underlying cache in the table cache
84 : // doesn't match Options.Cache.
85 : func newTableCacheContainer(
86 : tc *TableCache, cacheID uint64, objProvider objstorage.Provider, opts *Options, size int,
87 1 : ) *tableCacheContainer {
88 1 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
89 1 : if tc != nil {
90 0 : if tc.cache != opts.Cache {
91 0 : panic("pebble: underlying cache for the table cache and db are different")
92 : }
93 0 : tc.Ref()
94 1 : } else {
95 1 : // NewTableCache should create a ref to tc which the container should
96 1 : // drop whenever it is closed.
97 1 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
98 1 : }
99 :
100 1 : t := &tableCacheContainer{}
101 1 : t.tableCache = tc
102 1 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
103 1 : t.dbOpts.cacheID = cacheID
104 1 : t.dbOpts.objProvider = objProvider
105 1 : t.dbOpts.opts = opts.MakeReaderOptions()
106 1 : t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
107 1 : t.dbOpts.iterCount = new(atomic.Int32)
108 1 : return t
109 : }
110 :
111 : // Before calling close, make sure that there will be no further need
112 : // to access any of the files associated with the store.
113 1 : func (c *tableCacheContainer) close() error {
114 1 : // We want to do some cleanup work here. Check for leaked iterators
115 1 : // by the DB using this container. Note that we'll still perform cleanup
116 1 : // below in the case that there are leaked iterators.
117 1 : var err error
118 1 : if v := c.dbOpts.iterCount.Load(); v > 0 {
119 0 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
120 0 : }
121 :
122 : // Release nodes here.
123 1 : for _, shard := range c.tableCache.shards {
124 1 : if shard != nil {
125 1 : shard.removeDB(&c.dbOpts)
126 1 : }
127 : }
128 1 : return firstError(err, c.tableCache.Unref())
129 : }
130 :
131 : func (c *tableCacheContainer) newIters(
132 : ctx context.Context,
133 : file *manifest.FileMetadata,
134 : opts *IterOptions,
135 : internalOpts internalIterOpts,
136 1 : ) (internalIterator, keyspan.FragmentIterator, error) {
137 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts)
138 1 : }
139 :
140 : func (c *tableCacheContainer) newRangeKeyIter(
141 : file *manifest.FileMetadata, opts keyspan.SpanIterOptions,
142 1 : ) (keyspan.FragmentIterator, error) {
143 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newRangeKeyIter(file, opts, &c.dbOpts)
144 1 : }
145 :
146 : // getTableProperties returns the properties associated with the backing physical
147 : // table if the input metadata belongs to a virtual sstable.
148 0 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
149 0 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
150 0 : }
151 :
152 1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
153 1 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
154 1 : }
155 :
156 1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
157 1 : var m CacheMetrics
158 1 : for i := range c.tableCache.shards {
159 1 : s := c.tableCache.shards[i]
160 1 : s.mu.RLock()
161 1 : m.Count += int64(len(s.mu.nodes))
162 1 : s.mu.RUnlock()
163 1 : m.Hits += s.hits.Load()
164 1 : m.Misses += s.misses.Load()
165 1 : }
166 1 : m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
167 1 : f := c.dbOpts.filterMetrics.Load()
168 1 : return m, f
169 : }
170 :
171 : func (c *tableCacheContainer) estimateSize(
172 : meta *fileMetadata, lower, upper []byte,
173 1 : ) (size uint64, err error) {
174 1 : if meta.Virtual {
175 0 : err = c.withVirtualReader(
176 0 : meta.VirtualMeta(),
177 0 : func(r sstable.VirtualReader) (err error) {
178 0 : size, err = r.EstimateDiskUsage(lower, upper)
179 0 : return err
180 0 : },
181 : )
182 1 : } else {
183 1 : err = c.withReader(
184 1 : meta.PhysicalMeta(),
185 1 : func(r *sstable.Reader) (err error) {
186 1 : size, err = r.EstimateDiskUsage(lower, upper)
187 1 : return err
188 1 : },
189 : )
190 : }
191 1 : if err != nil {
192 0 : return 0, err
193 0 : }
194 1 : return size, nil
195 : }
196 :
197 1 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
198 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
199 1 : var cr sstable.CommonReader = v.reader
200 1 : if file.Virtual {
201 1 : virtualReader := sstable.MakeVirtualReader(
202 1 : v.reader, file.VirtualMeta(),
203 1 : )
204 1 : cr = &virtualReader
205 1 : }
206 1 : return cr
207 : }
208 :
209 : func (c *tableCacheContainer) withCommonReader(
210 : meta *fileMetadata, fn func(sstable.CommonReader) error,
211 1 : ) error {
212 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
213 1 : v := s.findNode(meta, &c.dbOpts)
214 1 : defer s.unrefValue(v)
215 1 : if v.err != nil {
216 0 : return v.err
217 0 : }
218 1 : return fn(createCommonReader(v, meta))
219 : }
220 :
221 1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
222 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
223 1 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
224 1 : defer s.unrefValue(v)
225 1 : if v.err != nil {
226 0 : return v.err
227 0 : }
228 1 : return fn(v.reader)
229 : }
230 :
231 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
232 : func (c *tableCacheContainer) withVirtualReader(
233 : meta virtualMeta, fn func(sstable.VirtualReader) error,
234 1 : ) error {
235 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
236 1 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
237 1 : defer s.unrefValue(v)
238 1 : if v.err != nil {
239 0 : return v.err
240 0 : }
241 1 : return fn(sstable.MakeVirtualReader(v.reader, meta))
242 : }
243 :
244 1 : func (c *tableCacheContainer) iterCount() int64 {
245 1 : return int64(c.dbOpts.iterCount.Load())
246 1 : }
247 :
248 : // TableCache is a shareable cache for open sstables.
249 : type TableCache struct {
250 : refs atomic.Int64
251 :
252 : cache *Cache
253 : shards []*tableCacheShard
254 : }
255 :
256 : // Ref adds a reference to the table cache. Once tableCache.init returns,
257 : // the table cache only remains valid if there is at least one reference
258 : // to it.
259 0 : func (c *TableCache) Ref() {
260 0 : v := c.refs.Add(1)
261 0 : // We don't want the reference count to ever go from 0 -> 1,
262 0 : // cause a reference count of 0 implies that we've closed the cache.
263 0 : if v <= 1 {
264 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
265 : }
266 : }
267 :
268 : // Unref removes a reference to the table cache.
269 1 : func (c *TableCache) Unref() error {
270 1 : v := c.refs.Add(-1)
271 1 : switch {
272 0 : case v < 0:
273 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
274 1 : case v == 0:
275 1 : var err error
276 1 : for i := range c.shards {
277 1 : // The cache shard is not allocated yet, nothing to close
278 1 : if c.shards[i] == nil {
279 0 : continue
280 : }
281 1 : err = firstError(err, c.shards[i].Close())
282 : }
283 :
284 : // Unref the cache which we create a reference to when the tableCache
285 : // is first instantiated.
286 1 : c.cache.Unref()
287 1 : return err
288 : }
289 0 : return nil
290 : }
291 :
292 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
293 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
294 1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
295 1 : if size == 0 {
296 0 : panic("pebble: cannot create a table cache of size 0")
297 1 : } else if numShards == 0 {
298 0 : panic("pebble: cannot create a table cache with 0 shards")
299 : }
300 :
301 1 : c := &TableCache{}
302 1 : c.cache = cache
303 1 : c.cache.Ref()
304 1 :
305 1 : c.shards = make([]*tableCacheShard, numShards)
306 1 : for i := range c.shards {
307 1 : c.shards[i] = &tableCacheShard{}
308 1 : c.shards[i].init(size / len(c.shards))
309 1 : }
310 :
311 : // Hold a ref to the cache here.
312 1 : c.refs.Store(1)
313 1 :
314 1 : return c
315 : }
316 :
317 1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
318 1 : return c.shards[uint64(fileNum.FileNum())%uint64(len(c.shards))]
319 1 : }
320 :
321 : type tableCacheKey struct {
322 : cacheID uint64
323 : fileNum base.DiskFileNum
324 : }
325 :
326 : type tableCacheShard struct {
327 : hits atomic.Int64
328 : misses atomic.Int64
329 : iterCount atomic.Int32
330 :
331 : size int
332 :
333 : mu struct {
334 : sync.RWMutex
335 : nodes map[tableCacheKey]*tableCacheNode
336 : // The iters map is only created and populated in race builds.
337 : iters map[io.Closer][]byte
338 :
339 : handHot *tableCacheNode
340 : handCold *tableCacheNode
341 : handTest *tableCacheNode
342 :
343 : coldTarget int
344 : sizeHot int
345 : sizeCold int
346 : sizeTest int
347 : }
348 : releasing sync.WaitGroup
349 : releasingCh chan *tableCacheValue
350 : releaseLoopExit sync.WaitGroup
351 : }
352 :
353 1 : func (c *tableCacheShard) init(size int) {
354 1 : c.size = size
355 1 :
356 1 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
357 1 : c.mu.coldTarget = size
358 1 : c.releasingCh = make(chan *tableCacheValue, 100)
359 1 : c.releaseLoopExit.Add(1)
360 1 : go c.releaseLoop()
361 1 :
362 1 : if invariants.RaceEnabled {
363 0 : c.mu.iters = make(map[io.Closer][]byte)
364 0 : }
365 : }
366 :
367 1 : func (c *tableCacheShard) releaseLoop() {
368 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
369 1 : defer c.releaseLoopExit.Done()
370 1 : for v := range c.releasingCh {
371 0 : v.release(c)
372 0 : }
373 : })
374 : }
375 :
376 : // checkAndIntersectFilters checks the specific table and block property filters
377 : // for intersection with any available table and block-level properties. Returns
378 : // true for ok if this table should be read by this iterator.
379 : func (c *tableCacheShard) checkAndIntersectFilters(
380 : v *tableCacheValue,
381 : tableFilter func(userProps map[string]string) bool,
382 : blockPropertyFilters []BlockPropertyFilter,
383 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
384 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
385 1 : if tableFilter != nil &&
386 1 : !tableFilter(v.reader.Properties.UserProperties) {
387 0 : return false, nil, nil
388 0 : }
389 :
390 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
391 1 : filterer, err = sstable.IntersectsTable(
392 1 : blockPropertyFilters,
393 1 : boundLimitedFilter,
394 1 : v.reader.Properties.UserProperties,
395 1 : )
396 1 : // NB: IntersectsTable will return a nil filterer if the table-level
397 1 : // properties indicate there's no intersection with the provided filters.
398 1 : if filterer == nil || err != nil {
399 1 : return false, nil, err
400 1 : }
401 : }
402 1 : return true, filterer, nil
403 : }
404 :
405 : func (c *tableCacheShard) newIters(
406 : ctx context.Context,
407 : file *manifest.FileMetadata,
408 : opts *IterOptions,
409 : internalOpts internalIterOpts,
410 : dbOpts *tableCacheOpts,
411 1 : ) (internalIterator, keyspan.FragmentIterator, error) {
412 1 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
413 1 : // since parts of the sstable are read during the construction. The Reader
414 1 : // should not remember that context since the Reader can be long-lived.
415 1 :
416 1 : // Calling findNode gives us the responsibility of decrementing v's
417 1 : // refCount. If opening the underlying table resulted in error, then we
418 1 : // decrement this straight away. Otherwise, we pass that responsibility to
419 1 : // the sstable iterator, which decrements when it is closed.
420 1 : v := c.findNode(file, dbOpts)
421 1 : if v.err != nil {
422 0 : defer c.unrefValue(v)
423 0 : return nil, nil, v.err
424 0 : }
425 :
426 1 : hideObsoletePoints := false
427 1 : var pointKeyFilters []BlockPropertyFilter
428 1 : if opts != nil {
429 1 : // This code is appending (at most one filter) in-place to
430 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
431 1 : // the same iterator tree. This is acceptable since all the following
432 1 : // properties are true:
433 1 : // - The iterator tree is single threaded, so the shared backing for the
434 1 : // slice is being mutated in a single threaded manner.
435 1 : // - Each shallow copy of the slice has its own notion of length.
436 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
437 1 : // struct, which is stateless, so overwriting that struct when creating
438 1 : // one sstable iterator is harmless to other sstable iterators that are
439 1 : // relying on that struct.
440 1 : //
441 1 : // An alternative would be to have different slices for different sstable
442 1 : // iterators, but that requires more work to avoid allocations.
443 1 : hideObsoletePoints, pointKeyFilters =
444 1 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
445 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
446 1 : }
447 1 : ok := true
448 1 : var filterer *sstable.BlockPropertiesFilterer
449 1 : var err error
450 1 : if opts != nil {
451 1 : ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
452 1 : pointKeyFilters, internalOpts.boundLimitedFilter)
453 1 : }
454 1 : if err != nil {
455 0 : c.unrefValue(v)
456 0 : return nil, nil, err
457 0 : }
458 :
459 : // Note: This suffers an allocation for virtual sstables.
460 1 : cr := createCommonReader(v, file)
461 1 :
462 1 : provider := dbOpts.objProvider
463 1 : // Check if this file is a foreign file.
464 1 : objMeta, err := provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
465 1 : if err != nil {
466 0 : return nil, nil, err
467 0 : }
468 :
469 : // NB: range-del iterator does not maintain a reference to the table, nor
470 : // does it need to read from it after creation.
471 1 : rangeDelIter, err := cr.NewRawRangeDelIter()
472 1 : if err != nil {
473 0 : c.unrefValue(v)
474 0 : return nil, nil, err
475 0 : }
476 :
477 1 : if !ok {
478 1 : c.unrefValue(v)
479 1 : // Return an empty iterator. This iterator has no mutable state, so
480 1 : // using a singleton is fine.
481 1 : // NB: We still return the potentially non-empty rangeDelIter. This
482 1 : // ensures the iterator observes the file's range deletions even if the
483 1 : // block property filters exclude all the file's point keys. The range
484 1 : // deletions may still delete keys lower in the LSM in files that DO
485 1 : // match the active filters.
486 1 : //
487 1 : // The point iterator returned must implement the filteredIter
488 1 : // interface, so that the level iterator surfaces file boundaries when
489 1 : // range deletions are present.
490 1 : return filteredAll, rangeDelIter, err
491 1 : }
492 :
493 1 : var iter sstable.Iterator
494 1 : useFilter := true
495 1 : if opts != nil {
496 1 : useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
497 1 : ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
498 1 : }
499 1 : tableFormat, err := v.reader.TableFormat()
500 1 : if err != nil {
501 0 : return nil, nil, err
502 0 : }
503 1 : var rp sstable.ReaderProvider
504 1 : if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
505 1 : rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
506 1 : }
507 :
508 1 : if provider.IsSharedForeign(objMeta) {
509 0 : if tableFormat < sstable.TableFormatPebblev4 {
510 0 : return nil, nil, errors.New("pebble: shared foreign sstable has a lower table format than expected")
511 0 : }
512 0 : hideObsoletePoints = true
513 : }
514 1 : if internalOpts.bytesIterated != nil {
515 1 : iter, err = cr.NewCompactionIter(internalOpts.bytesIterated, rp, internalOpts.bufferPool)
516 1 : } else {
517 1 : iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
518 1 : ctx, opts.GetLowerBound(), opts.GetUpperBound(), filterer, hideObsoletePoints, useFilter,
519 1 : internalOpts.stats, rp)
520 1 : }
521 1 : if err != nil {
522 0 : if rangeDelIter != nil {
523 0 : _ = rangeDelIter.Close()
524 0 : }
525 0 : c.unrefValue(v)
526 0 : return nil, nil, err
527 : }
528 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
529 : // care to avoid introducing an allocation here by adding a closure.
530 1 : iter.SetCloseHook(v.closeHook)
531 1 :
532 1 : c.iterCount.Add(1)
533 1 : dbOpts.iterCount.Add(1)
534 1 : if invariants.RaceEnabled {
535 0 : c.mu.Lock()
536 0 : c.mu.iters[iter] = debug.Stack()
537 0 : c.mu.Unlock()
538 0 : }
539 1 : return iter, rangeDelIter, nil
540 : }
541 :
542 : func (c *tableCacheShard) newRangeKeyIter(
543 : file *manifest.FileMetadata, opts keyspan.SpanIterOptions, dbOpts *tableCacheOpts,
544 1 : ) (keyspan.FragmentIterator, error) {
545 1 : // Calling findNode gives us the responsibility of decrementing v's
546 1 : // refCount. If opening the underlying table resulted in error, then we
547 1 : // decrement this straight away. Otherwise, we pass that responsibility to
548 1 : // the sstable iterator, which decrements when it is closed.
549 1 : v := c.findNode(file, dbOpts)
550 1 : if v.err != nil {
551 0 : defer c.unrefValue(v)
552 0 : return nil, v.err
553 0 : }
554 :
555 1 : ok := true
556 1 : var err error
557 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
558 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
559 1 : // file's range key blocks may surface deleted range keys below. This is
560 1 : // done here, rather than deferring to the block-property collector in order
561 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
562 1 : if v.reader.Properties.NumRangeKeyDels == 0 {
563 1 : ok, _, err = c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil)
564 1 : }
565 1 : if err != nil {
566 0 : c.unrefValue(v)
567 0 : return nil, err
568 0 : }
569 1 : if !ok {
570 0 : c.unrefValue(v)
571 0 : // Return the empty iterator. This iterator has no mutable state, so
572 0 : // using a singleton is fine.
573 0 : return emptyKeyspanIter, err
574 0 : }
575 :
576 1 : var iter keyspan.FragmentIterator
577 1 : if file.Virtual {
578 1 : virtualReader := sstable.MakeVirtualReader(
579 1 : v.reader, file.VirtualMeta(),
580 1 : )
581 1 : iter, err = virtualReader.NewRawRangeKeyIter()
582 1 : } else {
583 1 : iter, err = v.reader.NewRawRangeKeyIter()
584 1 : }
585 :
586 : // iter is a block iter that holds the entire value of the block in memory.
587 : // No need to hold onto a ref of the cache value.
588 1 : c.unrefValue(v)
589 1 :
590 1 : if err != nil {
591 0 : return nil, err
592 0 : }
593 :
594 1 : if iter == nil {
595 0 : // NewRawRangeKeyIter can return nil even if there's no error. However,
596 0 : // the keyspan.LevelIter expects a non-nil iterator if err is nil.
597 0 : return emptyKeyspanIter, nil
598 0 : }
599 :
600 1 : return iter, nil
601 : }
602 :
603 : type tableCacheShardReaderProvider struct {
604 : c *tableCacheShard
605 : file *manifest.FileMetadata
606 : dbOpts *tableCacheOpts
607 : v *tableCacheValue
608 : }
609 :
610 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
611 :
612 : // GetReader implements sstable.ReaderProvider. Note that it is not the
613 : // responsibility of tableCacheShardReaderProvider to ensure that the file
614 : // continues to exist. The ReaderProvider is used in iterators where the
615 : // top-level iterator is pinning the read state and preventing the files from
616 : // being deleted.
617 : //
618 : // The caller must call tableCacheShardReaderProvider.Close.
619 : //
620 : // Note that currently the Reader returned here is only used to read value
621 : // blocks. This reader shouldn't be used for other purposes like reading keys
622 : // outside of virtual sstable bounds.
623 : //
624 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
625 : // that the reader isn't used for other purposes.
626 1 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
627 1 : // Calling findNode gives us the responsibility of decrementing v's
628 1 : // refCount.
629 1 : v := rp.c.findNode(rp.file, rp.dbOpts)
630 1 : if v.err != nil {
631 0 : defer rp.c.unrefValue(v)
632 0 : return nil, v.err
633 0 : }
634 1 : rp.v = v
635 1 : return v.reader, nil
636 : }
637 :
638 : // Close implements sstable.ReaderProvider.
639 1 : func (rp *tableCacheShardReaderProvider) Close() {
640 1 : rp.c.unrefValue(rp.v)
641 1 : rp.v = nil
642 1 : }
643 :
644 : // getTableProperties return sst table properties for target file
645 : func (c *tableCacheShard) getTableProperties(
646 : file *fileMetadata, dbOpts *tableCacheOpts,
647 0 : ) (*sstable.Properties, error) {
648 0 : // Calling findNode gives us the responsibility of decrementing v's refCount here
649 0 : v := c.findNode(file, dbOpts)
650 0 : defer c.unrefValue(v)
651 0 :
652 0 : if v.err != nil {
653 0 : return nil, v.err
654 0 : }
655 0 : return &v.reader.Properties, nil
656 : }
657 :
658 : // releaseNode releases a node from the tableCacheShard.
659 : //
660 : // c.mu must be held when calling this.
661 0 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
662 0 : c.unlinkNode(n)
663 0 : c.clearNode(n)
664 0 : }
665 :
666 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
667 : // reference in place.
668 : //
669 : // c.mu must be held when calling this.
670 1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
671 1 : key := tableCacheKey{n.cacheID, n.fileNum}
672 1 : delete(c.mu.nodes, key)
673 1 :
674 1 : switch n.ptype {
675 0 : case tableCacheNodeHot:
676 0 : c.mu.sizeHot--
677 1 : case tableCacheNodeCold:
678 1 : c.mu.sizeCold--
679 0 : case tableCacheNodeTest:
680 0 : c.mu.sizeTest--
681 : }
682 :
683 1 : if n == c.mu.handHot {
684 1 : c.mu.handHot = c.mu.handHot.prev()
685 1 : }
686 1 : if n == c.mu.handCold {
687 1 : c.mu.handCold = c.mu.handCold.prev()
688 1 : }
689 1 : if n == c.mu.handTest {
690 1 : c.mu.handTest = c.mu.handTest.prev()
691 1 : }
692 :
693 1 : if n.unlink() == n {
694 1 : // This was the last entry in the cache.
695 1 : c.mu.handHot = nil
696 1 : c.mu.handCold = nil
697 1 : c.mu.handTest = nil
698 1 : }
699 :
700 1 : n.links.prev = nil
701 1 : n.links.next = nil
702 : }
703 :
704 0 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
705 0 : if v := n.value; v != nil {
706 0 : n.value = nil
707 0 : c.unrefValue(v)
708 0 : }
709 : }
710 :
711 : // unrefValue decrements the reference count for the specified value, releasing
712 : // it if the reference count fell to 0. Note that the value has a reference if
713 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
714 : // the node has already been removed from that map.
715 1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
716 1 : if v.refCount.Add(-1) == 0 {
717 0 : c.releasing.Add(1)
718 0 : c.releasingCh <- v
719 0 : }
720 : }
721 :
722 : // findNode returns the node for the table with the given file number, creating
723 : // that node if it didn't already exist. The caller is responsible for
724 : // decrementing the returned node's refCount.
725 : func (c *tableCacheShard) findNode(
726 : meta *fileMetadata, dbOpts *tableCacheOpts,
727 1 : ) (v *tableCacheValue) {
728 1 : // Loading a file before its global sequence number is known (eg,
729 1 : // during ingest before entering the commit pipeline) can pollute
730 1 : // the cache with incorrect state. In invariant builds, verify
731 1 : // that the global sequence number of the returned reader matches.
732 1 : if invariants.Enabled {
733 1 : defer func() {
734 1 : if v.reader != nil && meta.LargestSeqNum == meta.SmallestSeqNum &&
735 1 : v.reader.Properties.GlobalSeqNum != meta.SmallestSeqNum {
736 0 : panic(errors.AssertionFailedf("file %s loaded from table cache with the wrong global sequence number %d",
737 0 : meta, v.reader.Properties.GlobalSeqNum))
738 : }
739 : }()
740 : }
741 1 : if refs := meta.Refs(); refs <= 0 {
742 0 : panic(errors.AssertionFailedf("attempting to load file %s with refs=%d from table cache",
743 0 : meta, refs))
744 : }
745 :
746 : // Fast-path for a hit in the cache.
747 1 : c.mu.RLock()
748 1 : key := tableCacheKey{dbOpts.cacheID, meta.FileBacking.DiskFileNum}
749 1 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
750 1 : // Fast-path hit.
751 1 : //
752 1 : // The caller is responsible for decrementing the refCount.
753 1 : v = n.value
754 1 : v.refCount.Add(1)
755 1 : c.mu.RUnlock()
756 1 : n.referenced.Store(true)
757 1 : c.hits.Add(1)
758 1 : <-v.loaded
759 1 : return v
760 1 : }
761 1 : c.mu.RUnlock()
762 1 :
763 1 : c.mu.Lock()
764 1 :
765 1 : n := c.mu.nodes[key]
766 1 : switch {
767 1 : case n == nil:
768 1 : // Slow-path miss of a non-existent node.
769 1 : n = &tableCacheNode{
770 1 : fileNum: meta.FileBacking.DiskFileNum,
771 1 : ptype: tableCacheNodeCold,
772 1 : }
773 1 : c.addNode(n, dbOpts)
774 1 : c.mu.sizeCold++
775 :
776 1 : case n.value != nil:
777 1 : // Slow-path hit of a hot or cold node.
778 1 : //
779 1 : // The caller is responsible for decrementing the refCount.
780 1 : v = n.value
781 1 : v.refCount.Add(1)
782 1 : n.referenced.Store(true)
783 1 : c.hits.Add(1)
784 1 : c.mu.Unlock()
785 1 : <-v.loaded
786 1 : return v
787 :
788 0 : default:
789 0 : // Slow-path miss of a test node.
790 0 : c.unlinkNode(n)
791 0 : c.mu.coldTarget++
792 0 : if c.mu.coldTarget > c.size {
793 0 : c.mu.coldTarget = c.size
794 0 : }
795 :
796 0 : n.referenced.Store(false)
797 0 : n.ptype = tableCacheNodeHot
798 0 : c.addNode(n, dbOpts)
799 0 : c.mu.sizeHot++
800 : }
801 :
802 1 : c.misses.Add(1)
803 1 :
804 1 : v = &tableCacheValue{
805 1 : loaded: make(chan struct{}),
806 1 : }
807 1 : v.refCount.Store(2)
808 1 : // Cache the closure invoked when an iterator is closed. This avoids an
809 1 : // allocation on every call to newIters.
810 1 : v.closeHook = func(i sstable.Iterator) error {
811 1 : if invariants.RaceEnabled {
812 0 : c.mu.Lock()
813 0 : delete(c.mu.iters, i)
814 0 : c.mu.Unlock()
815 0 : }
816 1 : c.unrefValue(v)
817 1 : c.iterCount.Add(-1)
818 1 : dbOpts.iterCount.Add(-1)
819 1 : return nil
820 : }
821 1 : n.value = v
822 1 :
823 1 : c.mu.Unlock()
824 1 :
825 1 : // Note adding to the cache lists must complete before we begin loading the
826 1 : // table as a failure during load will result in the node being unlinked.
827 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
828 1 : v.load(
829 1 : loadInfo{
830 1 : backingFileNum: meta.FileBacking.DiskFileNum,
831 1 : smallestSeqNum: meta.SmallestSeqNum,
832 1 : largestSeqNum: meta.LargestSeqNum,
833 1 : }, c, dbOpts)
834 1 : })
835 1 : return v
836 : }
837 :
838 1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
839 1 : c.evictNodes()
840 1 : n.cacheID = dbOpts.cacheID
841 1 : key := tableCacheKey{n.cacheID, n.fileNum}
842 1 : c.mu.nodes[key] = n
843 1 :
844 1 : n.links.next = n
845 1 : n.links.prev = n
846 1 : if c.mu.handHot == nil {
847 1 : // First element.
848 1 : c.mu.handHot = n
849 1 : c.mu.handCold = n
850 1 : c.mu.handTest = n
851 1 : } else {
852 1 : c.mu.handHot.link(n)
853 1 : }
854 :
855 1 : if c.mu.handCold == c.mu.handHot {
856 1 : c.mu.handCold = c.mu.handCold.prev()
857 1 : }
858 : }
859 :
860 1 : func (c *tableCacheShard) evictNodes() {
861 1 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
862 0 : c.runHandCold()
863 0 : }
864 : }
865 :
866 0 : func (c *tableCacheShard) runHandCold() {
867 0 : n := c.mu.handCold
868 0 : if n.ptype == tableCacheNodeCold {
869 0 : if n.referenced.Load() {
870 0 : n.referenced.Store(false)
871 0 : n.ptype = tableCacheNodeHot
872 0 : c.mu.sizeCold--
873 0 : c.mu.sizeHot++
874 0 : } else {
875 0 : c.clearNode(n)
876 0 : n.ptype = tableCacheNodeTest
877 0 : c.mu.sizeCold--
878 0 : c.mu.sizeTest++
879 0 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
880 0 : c.runHandTest()
881 0 : }
882 : }
883 : }
884 :
885 0 : c.mu.handCold = c.mu.handCold.next()
886 0 :
887 0 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
888 0 : c.runHandHot()
889 0 : }
890 : }
891 :
892 0 : func (c *tableCacheShard) runHandHot() {
893 0 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
894 0 : c.runHandTest()
895 0 : if c.mu.handHot == nil {
896 0 : return
897 0 : }
898 : }
899 :
900 0 : n := c.mu.handHot
901 0 : if n.ptype == tableCacheNodeHot {
902 0 : if n.referenced.Load() {
903 0 : n.referenced.Store(false)
904 0 : } else {
905 0 : n.ptype = tableCacheNodeCold
906 0 : c.mu.sizeHot--
907 0 : c.mu.sizeCold++
908 0 : }
909 : }
910 :
911 0 : c.mu.handHot = c.mu.handHot.next()
912 : }
913 :
914 0 : func (c *tableCacheShard) runHandTest() {
915 0 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
916 0 : c.runHandCold()
917 0 : if c.mu.handTest == nil {
918 0 : return
919 0 : }
920 : }
921 :
922 0 : n := c.mu.handTest
923 0 : if n.ptype == tableCacheNodeTest {
924 0 : c.mu.coldTarget--
925 0 : if c.mu.coldTarget < 0 {
926 0 : c.mu.coldTarget = 0
927 0 : }
928 0 : c.unlinkNode(n)
929 0 : c.clearNode(n)
930 : }
931 :
932 0 : c.mu.handTest = c.mu.handTest.next()
933 : }
934 :
935 1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
936 1 : c.mu.Lock()
937 1 : key := tableCacheKey{dbOpts.cacheID, fileNum}
938 1 : n := c.mu.nodes[key]
939 1 : var v *tableCacheValue
940 1 : if n != nil {
941 1 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
942 1 : // the tableCacheNode.release() call synchronously below to ensure the
943 1 : // sstable file descriptor is closed before returning. Note that
944 1 : // tableCacheShard.releasing needs to be incremented while holding
945 1 : // tableCacheShard.mu in order to avoid a race with Close()
946 1 : c.unlinkNode(n)
947 1 : v = n.value
948 1 : if v != nil {
949 1 : if !allowLeak {
950 1 : if t := v.refCount.Add(-1); t != 0 {
951 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
952 0 : }
953 : }
954 1 : c.releasing.Add(1)
955 : }
956 : }
957 :
958 1 : c.mu.Unlock()
959 1 :
960 1 : if v != nil {
961 1 : v.release(c)
962 1 : }
963 :
964 1 : dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
965 : }
966 :
967 : // removeDB evicts any nodes which have a reference to the DB
968 : // associated with dbOpts.cacheID. Make sure that there will
969 : // be no more accesses to the files associated with the DB.
970 1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
971 1 : var fileNums []base.DiskFileNum
972 1 :
973 1 : c.mu.RLock()
974 1 : // Collect the fileNums which need to be cleaned.
975 1 : var firstNode *tableCacheNode
976 1 : node := c.mu.handHot
977 1 : for node != firstNode {
978 1 : if firstNode == nil {
979 1 : firstNode = node
980 1 : }
981 :
982 1 : if node.cacheID == dbOpts.cacheID {
983 1 : fileNums = append(fileNums, node.fileNum)
984 1 : }
985 1 : node = node.next()
986 : }
987 1 : c.mu.RUnlock()
988 1 :
989 1 : // Evict all the nodes associated with the DB.
990 1 : // This should synchronously close all the files
991 1 : // associated with the DB.
992 1 : for _, fileNum := range fileNums {
993 1 : c.evict(fileNum, dbOpts, true)
994 1 : }
995 : }
996 :
997 1 : func (c *tableCacheShard) Close() error {
998 1 : c.mu.Lock()
999 1 : defer c.mu.Unlock()
1000 1 :
1001 1 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1002 1 : // the case that there are leaked iterators.
1003 1 : var err error
1004 1 : if v := c.iterCount.Load(); v > 0 {
1005 0 : if !invariants.RaceEnabled {
1006 0 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1007 0 : } else {
1008 0 : var buf bytes.Buffer
1009 0 : for _, stack := range c.mu.iters {
1010 0 : fmt.Fprintf(&buf, "%s\n", stack)
1011 0 : }
1012 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1013 : }
1014 : }
1015 :
1016 1 : for c.mu.handHot != nil {
1017 0 : n := c.mu.handHot
1018 0 : if n.value != nil {
1019 0 : if n.value.refCount.Add(-1) == 0 {
1020 0 : c.releasing.Add(1)
1021 0 : c.releasingCh <- n.value
1022 0 : }
1023 : }
1024 0 : c.unlinkNode(n)
1025 : }
1026 1 : c.mu.nodes = nil
1027 1 : c.mu.handHot = nil
1028 1 : c.mu.handCold = nil
1029 1 : c.mu.handTest = nil
1030 1 :
1031 1 : // Only shutdown the releasing goroutine if there were no leaked
1032 1 : // iterators. If there were leaked iterators, we leave the goroutine running
1033 1 : // and the releasingCh open so that a subsequent iterator close can
1034 1 : // complete. This behavior is used by iterator leak tests. Leaking the
1035 1 : // goroutine for these tests is less bad not closing the iterator which
1036 1 : // triggers other warnings about block cache handles not being released.
1037 1 : if err != nil {
1038 0 : c.releasing.Wait()
1039 0 : return err
1040 0 : }
1041 :
1042 1 : close(c.releasingCh)
1043 1 : c.releasing.Wait()
1044 1 : c.releaseLoopExit.Wait()
1045 1 : return err
1046 : }
1047 :
1048 : type tableCacheValue struct {
1049 : closeHook func(i sstable.Iterator) error
1050 : reader *sstable.Reader
1051 : err error
1052 : loaded chan struct{}
1053 : // Reference count for the value. The reader is closed when the reference
1054 : // count drops to zero.
1055 : refCount atomic.Int32
1056 : }
1057 :
1058 : type loadInfo struct {
1059 : backingFileNum base.DiskFileNum
1060 : largestSeqNum uint64
1061 : smallestSeqNum uint64
1062 : }
1063 :
1064 1 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
1065 1 : // Try opening the file first.
1066 1 : var f objstorage.Readable
1067 1 : var err error
1068 1 : f, err = dbOpts.objProvider.OpenForReading(
1069 1 : context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
1070 1 : )
1071 1 : if err == nil {
1072 1 : cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
1073 1 : v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
1074 1 : }
1075 1 : if err != nil {
1076 0 : v.err = errors.Wrapf(
1077 0 : err, "pebble: backing file %s error", errors.Safe(loadInfo.backingFileNum.FileNum()))
1078 0 : }
1079 1 : if v.err == nil && loadInfo.smallestSeqNum == loadInfo.largestSeqNum {
1080 1 : v.reader.Properties.GlobalSeqNum = loadInfo.largestSeqNum
1081 1 : }
1082 1 : if v.err != nil {
1083 0 : c.mu.Lock()
1084 0 : defer c.mu.Unlock()
1085 0 : // Lookup the node in the cache again as it might have already been
1086 0 : // removed.
1087 0 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
1088 0 : n := c.mu.nodes[key]
1089 0 : if n != nil && n.value == v {
1090 0 : c.releaseNode(n)
1091 0 : }
1092 : }
1093 1 : close(v.loaded)
1094 : }
1095 :
1096 1 : func (v *tableCacheValue) release(c *tableCacheShard) {
1097 1 : <-v.loaded
1098 1 : // Nothing to be done about an error at this point. Close the reader if it is
1099 1 : // open.
1100 1 : if v.reader != nil {
1101 1 : _ = v.reader.Close()
1102 1 : }
1103 1 : c.releasing.Done()
1104 : }
1105 :
1106 : type tableCacheNodeType int8
1107 :
1108 : const (
1109 : tableCacheNodeTest tableCacheNodeType = iota
1110 : tableCacheNodeCold
1111 : tableCacheNodeHot
1112 : )
1113 :
1114 0 : func (p tableCacheNodeType) String() string {
1115 0 : switch p {
1116 0 : case tableCacheNodeTest:
1117 0 : return "test"
1118 0 : case tableCacheNodeCold:
1119 0 : return "cold"
1120 0 : case tableCacheNodeHot:
1121 0 : return "hot"
1122 : }
1123 0 : return "unknown"
1124 : }
1125 :
1126 : type tableCacheNode struct {
1127 : fileNum base.DiskFileNum
1128 : value *tableCacheValue
1129 :
1130 : links struct {
1131 : next *tableCacheNode
1132 : prev *tableCacheNode
1133 : }
1134 : ptype tableCacheNodeType
1135 : // referenced is atomically set to indicate that this entry has been accessed
1136 : // since the last time one of the clock hands swept it.
1137 : referenced atomic.Bool
1138 :
1139 : // Storing the cache id associated with the DB instance here
1140 : // avoids the need to thread the dbOpts struct through many functions.
1141 : cacheID uint64
1142 : }
1143 :
1144 1 : func (n *tableCacheNode) next() *tableCacheNode {
1145 1 : if n == nil {
1146 0 : return nil
1147 0 : }
1148 1 : return n.links.next
1149 : }
1150 :
1151 1 : func (n *tableCacheNode) prev() *tableCacheNode {
1152 1 : if n == nil {
1153 0 : return nil
1154 0 : }
1155 1 : return n.links.prev
1156 : }
1157 :
1158 1 : func (n *tableCacheNode) link(s *tableCacheNode) {
1159 1 : s.links.prev = n.links.prev
1160 1 : s.links.prev.links.next = s
1161 1 : s.links.next = n
1162 1 : s.links.next.links.prev = s
1163 1 : }
1164 :
1165 1 : func (n *tableCacheNode) unlink() *tableCacheNode {
1166 1 : next := n.links.next
1167 1 : n.links.prev.links.next = n.links.next
1168 1 : n.links.next.links.prev = n.links.prev
1169 1 : n.links.prev = n
1170 1 : n.links.next = n
1171 1 : return next
1172 1 : }
|