Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/manifest"
23 : "github.com/cockroachdb/pebble/internal/private"
24 : "github.com/cockroachdb/pebble/objstorage"
25 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
26 : "github.com/cockroachdb/pebble/sstable"
27 : )
28 :
29 : var emptyIter = &errorIter{err: nil}
30 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
31 :
32 : // filteredAll is a singleton internalIterator implementation used when an
33 : // sstable does contain point keys, but all the keys are filtered by the active
34 : // PointKeyFilters set in the iterator's IterOptions.
35 : //
36 : // filteredAll implements filteredIter, ensuring the level iterator recognizes
37 : // when it may need to return file boundaries to keep the rangeDelIter open
38 : // during mergingIter operation.
39 : var filteredAll = &filteredAllKeysIter{errorIter: errorIter{err: nil}}
40 :
41 : var _ filteredIter = filteredAll
42 :
43 : type filteredAllKeysIter struct {
44 : errorIter
45 : }
46 :
47 0 : func (s *filteredAllKeysIter) MaybeFilteredKeys() bool {
48 0 : return true
49 0 : }
50 :
51 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
52 :
53 : // tableCacheOpts contains the db specific fields
54 : // of a table cache. This is stored in the tableCacheContainer
55 : // along with the table cache.
56 : // NB: It is important to make sure that the fields in this
57 : // struct are read-only. Since the fields here are shared
58 : // by every single tableCacheShard, if non read-only fields
59 : // are updated, we could have unnecessary evictions of those
60 : // fields, and the surrounding fields from the CPU caches.
61 : type tableCacheOpts struct {
62 : // iterCount keeps track of how many iterators are open. It is used to keep
63 : // track of leaked iterators on a per-db level.
64 : iterCount *atomic.Int32
65 :
66 : loggerAndTracer LoggerAndTracer
67 : cacheID uint64
68 : objProvider objstorage.Provider
69 : opts sstable.ReaderOptions
70 : filterMetrics *sstable.FilterMetricsTracker
71 : }
72 :
73 : // tableCacheContainer contains the table cache and
74 : // fields which are unique to the DB.
75 : type tableCacheContainer struct {
76 : tableCache *TableCache
77 :
78 : // dbOpts contains fields relevant to the table cache
79 : // which are unique to each DB.
80 : dbOpts tableCacheOpts
81 : }
82 :
83 : // newTableCacheContainer will panic if the underlying cache in the table cache
84 : // doesn't match Options.Cache.
85 : func newTableCacheContainer(
86 : tc *TableCache, cacheID uint64, objProvider objstorage.Provider, opts *Options, size int,
87 1 : ) *tableCacheContainer {
88 1 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
89 1 : if tc != nil {
90 1 : if tc.cache != opts.Cache {
91 0 : panic("pebble: underlying cache for the table cache and db are different")
92 : }
93 1 : tc.Ref()
94 1 : } else {
95 1 : // NewTableCache should create a ref to tc which the container should
96 1 : // drop whenever it is closed.
97 1 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
98 1 : }
99 :
100 1 : t := &tableCacheContainer{}
101 1 : t.tableCache = tc
102 1 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
103 1 : t.dbOpts.cacheID = cacheID
104 1 : t.dbOpts.objProvider = objProvider
105 1 : t.dbOpts.opts = opts.MakeReaderOptions()
106 1 : t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
107 1 : t.dbOpts.iterCount = new(atomic.Int32)
108 1 : return t
109 : }
110 :
111 : // Before calling close, make sure that there will be no further need
112 : // to access any of the files associated with the store.
113 1 : func (c *tableCacheContainer) close() error {
114 1 : // We want to do some cleanup work here. Check for leaked iterators
115 1 : // by the DB using this container. Note that we'll still perform cleanup
116 1 : // below in the case that there are leaked iterators.
117 1 : var err error
118 1 : if v := c.dbOpts.iterCount.Load(); v > 0 {
119 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
120 1 : }
121 :
122 : // Release nodes here.
123 1 : for _, shard := range c.tableCache.shards {
124 1 : if shard != nil {
125 1 : shard.removeDB(&c.dbOpts)
126 1 : }
127 : }
128 1 : return firstError(err, c.tableCache.Unref())
129 : }
130 :
131 : func (c *tableCacheContainer) newIters(
132 : ctx context.Context,
133 : file *manifest.FileMetadata,
134 : opts *IterOptions,
135 : internalOpts internalIterOpts,
136 1 : ) (internalIterator, keyspan.FragmentIterator, error) {
137 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts)
138 1 : }
139 :
140 : func (c *tableCacheContainer) newRangeKeyIter(
141 : file *manifest.FileMetadata, opts keyspan.SpanIterOptions,
142 1 : ) (keyspan.FragmentIterator, error) {
143 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newRangeKeyIter(file, opts, &c.dbOpts)
144 1 : }
145 :
146 : // getTableProperties returns the properties associated with the backing physical
147 : // table if the input metadata belongs to a virtual sstable.
148 1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
149 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
150 1 : }
151 :
152 1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
153 1 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
154 1 : }
155 :
156 1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
157 1 : var m CacheMetrics
158 1 : for i := range c.tableCache.shards {
159 1 : s := c.tableCache.shards[i]
160 1 : s.mu.RLock()
161 1 : m.Count += int64(len(s.mu.nodes))
162 1 : s.mu.RUnlock()
163 1 : m.Hits += s.hits.Load()
164 1 : m.Misses += s.misses.Load()
165 1 : }
166 1 : m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
167 1 : f := c.dbOpts.filterMetrics.Load()
168 1 : return m, f
169 : }
170 :
171 : func (c *tableCacheContainer) estimateSize(
172 : meta *fileMetadata, lower, upper []byte,
173 1 : ) (size uint64, err error) {
174 1 : if meta.Virtual {
175 1 : err = c.withVirtualReader(
176 1 : meta.VirtualMeta(),
177 1 : func(r sstable.VirtualReader) (err error) {
178 1 : size, err = r.EstimateDiskUsage(lower, upper)
179 1 : return err
180 1 : },
181 : )
182 1 : } else {
183 1 : err = c.withReader(
184 1 : meta.PhysicalMeta(),
185 1 : func(r *sstable.Reader) (err error) {
186 1 : size, err = r.EstimateDiskUsage(lower, upper)
187 1 : return err
188 1 : },
189 : )
190 : }
191 1 : if err != nil {
192 0 : return 0, err
193 0 : }
194 1 : return size, nil
195 : }
196 :
197 1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
198 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
199 1 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
200 1 : defer s.unrefValue(v)
201 1 : if v.err != nil {
202 0 : return v.err
203 0 : }
204 1 : return fn(v.reader)
205 : }
206 :
207 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
208 : func (c *tableCacheContainer) withVirtualReader(
209 : meta virtualMeta, fn func(sstable.VirtualReader) error,
210 1 : ) error {
211 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
212 1 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
213 1 : defer s.unrefValue(v)
214 1 : if v.err != nil {
215 0 : return v.err
216 0 : }
217 1 : return fn(sstable.MakeVirtualReader(v.reader, meta))
218 : }
219 :
220 1 : func (c *tableCacheContainer) iterCount() int64 {
221 1 : return int64(c.dbOpts.iterCount.Load())
222 1 : }
223 :
224 : // TableCache is a shareable cache for open sstables.
225 : type TableCache struct {
226 : refs atomic.Int64
227 :
228 : cache *Cache
229 : shards []*tableCacheShard
230 : }
231 :
232 : // Ref adds a reference to the table cache. Once tableCache.init returns,
233 : // the table cache only remains valid if there is at least one reference
234 : // to it.
235 1 : func (c *TableCache) Ref() {
236 1 : v := c.refs.Add(1)
237 1 : // We don't want the reference count to ever go from 0 -> 1,
238 1 : // cause a reference count of 0 implies that we've closed the cache.
239 1 : if v <= 1 {
240 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
241 : }
242 : }
243 :
244 : // Unref removes a reference to the table cache.
245 1 : func (c *TableCache) Unref() error {
246 1 : v := c.refs.Add(-1)
247 1 : switch {
248 1 : case v < 0:
249 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
250 1 : case v == 0:
251 1 : var err error
252 1 : for i := range c.shards {
253 1 : // The cache shard is not allocated yet, nothing to close
254 1 : if c.shards[i] == nil {
255 0 : continue
256 : }
257 1 : err = firstError(err, c.shards[i].Close())
258 : }
259 :
260 : // Unref the cache which we create a reference to when the tableCache
261 : // is first instantiated.
262 1 : c.cache.Unref()
263 1 : return err
264 : }
265 1 : return nil
266 : }
267 :
268 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
269 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
270 1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
271 1 : if size == 0 {
272 0 : panic("pebble: cannot create a table cache of size 0")
273 1 : } else if numShards == 0 {
274 0 : panic("pebble: cannot create a table cache with 0 shards")
275 : }
276 :
277 1 : c := &TableCache{}
278 1 : c.cache = cache
279 1 : c.cache.Ref()
280 1 :
281 1 : c.shards = make([]*tableCacheShard, numShards)
282 1 : for i := range c.shards {
283 1 : c.shards[i] = &tableCacheShard{}
284 1 : c.shards[i].init(size / len(c.shards))
285 1 : }
286 :
287 : // Hold a ref to the cache here.
288 1 : c.refs.Store(1)
289 1 :
290 1 : return c
291 : }
292 :
293 1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
294 1 : return c.shards[uint64(fileNum.FileNum())%uint64(len(c.shards))]
295 1 : }
296 :
297 : type tableCacheKey struct {
298 : cacheID uint64
299 : fileNum base.DiskFileNum
300 : }
301 :
302 : type tableCacheShard struct {
303 : hits atomic.Int64
304 : misses atomic.Int64
305 : iterCount atomic.Int32
306 :
307 : size int
308 :
309 : mu struct {
310 : sync.RWMutex
311 : nodes map[tableCacheKey]*tableCacheNode
312 : // The iters map is only created and populated in race builds.
313 : iters map[io.Closer][]byte
314 :
315 : handHot *tableCacheNode
316 : handCold *tableCacheNode
317 : handTest *tableCacheNode
318 :
319 : coldTarget int
320 : sizeHot int
321 : sizeCold int
322 : sizeTest int
323 : }
324 : releasing sync.WaitGroup
325 : releasingCh chan *tableCacheValue
326 : releaseLoopExit sync.WaitGroup
327 : }
328 :
329 1 : func (c *tableCacheShard) init(size int) {
330 1 : c.size = size
331 1 :
332 1 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
333 1 : c.mu.coldTarget = size
334 1 : c.releasingCh = make(chan *tableCacheValue, 100)
335 1 : c.releaseLoopExit.Add(1)
336 1 : go c.releaseLoop()
337 1 :
338 1 : if invariants.RaceEnabled {
339 0 : c.mu.iters = make(map[io.Closer][]byte)
340 0 : }
341 : }
342 :
343 1 : func (c *tableCacheShard) releaseLoop() {
344 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
345 1 : defer c.releaseLoopExit.Done()
346 1 : for v := range c.releasingCh {
347 1 : v.release(c)
348 1 : }
349 : })
350 : }
351 :
352 : // checkAndIntersectFilters checks the specific table and block property filters
353 : // for intersection with any available table and block-level properties. Returns
354 : // true for ok if this table should be read by this iterator.
355 : func (c *tableCacheShard) checkAndIntersectFilters(
356 : v *tableCacheValue,
357 : tableFilter func(userProps map[string]string) bool,
358 : blockPropertyFilters []BlockPropertyFilter,
359 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
360 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
361 1 : if tableFilter != nil &&
362 1 : !tableFilter(v.reader.Properties.UserProperties) {
363 1 : return false, nil, nil
364 1 : }
365 :
366 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
367 1 : filterer, err = sstable.IntersectsTable(
368 1 : blockPropertyFilters,
369 1 : boundLimitedFilter,
370 1 : v.reader.Properties.UserProperties,
371 1 : )
372 1 : // NB: IntersectsTable will return a nil filterer if the table-level
373 1 : // properties indicate there's no intersection with the provided filters.
374 1 : if filterer == nil || err != nil {
375 1 : return false, nil, err
376 1 : }
377 : }
378 1 : return true, filterer, nil
379 : }
380 :
381 : func (c *tableCacheShard) newIters(
382 : ctx context.Context,
383 : file *manifest.FileMetadata,
384 : opts *IterOptions,
385 : internalOpts internalIterOpts,
386 : dbOpts *tableCacheOpts,
387 1 : ) (internalIterator, keyspan.FragmentIterator, error) {
388 1 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
389 1 : // since parts of the sstable are read during the construction. The Reader
390 1 : // should not remember that context since the Reader can be long-lived.
391 1 :
392 1 : // Calling findNode gives us the responsibility of decrementing v's
393 1 : // refCount. If opening the underlying table resulted in error, then we
394 1 : // decrement this straight away. Otherwise, we pass that responsibility to
395 1 : // the sstable iterator, which decrements when it is closed.
396 1 : v := c.findNode(file, dbOpts)
397 1 : if v.err != nil {
398 1 : defer c.unrefValue(v)
399 1 : return nil, nil, v.err
400 1 : }
401 :
402 1 : hideObsoletePoints := false
403 1 : var pointKeyFilters []BlockPropertyFilter
404 1 : if opts != nil {
405 1 : // This code is appending (at most one filter) in-place to
406 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
407 1 : // the same iterator tree. This is acceptable since all the following
408 1 : // properties are true:
409 1 : // - The iterator tree is single threaded, so the shared backing for the
410 1 : // slice is being mutated in a single threaded manner.
411 1 : // - Each shallow copy of the slice has its own notion of length.
412 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
413 1 : // struct, which is stateless, so overwriting that struct when creating
414 1 : // one sstable iterator is harmless to other sstable iterators that are
415 1 : // relying on that struct.
416 1 : //
417 1 : // An alternative would be to have different slices for different sstable
418 1 : // iterators, but that requires more work to avoid allocations.
419 1 : hideObsoletePoints, pointKeyFilters =
420 1 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
421 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
422 1 : }
423 1 : ok := true
424 1 : var filterer *sstable.BlockPropertiesFilterer
425 1 : var err error
426 1 : if opts != nil {
427 1 : ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
428 1 : pointKeyFilters, internalOpts.boundLimitedFilter)
429 1 : }
430 1 : if err != nil {
431 0 : c.unrefValue(v)
432 0 : return nil, nil, err
433 0 : }
434 :
435 1 : type iterCreator interface {
436 1 : NewRawRangeDelIter() (keyspan.FragmentIterator, error)
437 1 : NewIterWithBlockPropertyFiltersAndContextEtc(ctx context.Context, lower, upper []byte, filterer *sstable.BlockPropertiesFilterer, hideObsoletePoints, useFilterBlock bool, stats *base.InternalIteratorStats, rp sstable.ReaderProvider) (sstable.Iterator, error)
438 1 : NewCompactionIter(
439 1 : bytesIterated *uint64,
440 1 : rp sstable.ReaderProvider,
441 1 : bufferPool *sstable.BufferPool,
442 1 : ) (sstable.Iterator, error)
443 1 : }
444 1 :
445 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
446 1 : var ic iterCreator = v.reader
447 1 : if file.Virtual {
448 1 : virtualReader := sstable.MakeVirtualReader(
449 1 : v.reader, file.VirtualMeta(),
450 1 : )
451 1 : ic = &virtualReader
452 1 : }
453 :
454 1 : provider := dbOpts.objProvider
455 1 : // Check if this file is a foreign file.
456 1 : objMeta, err := provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
457 1 : if err != nil {
458 0 : return nil, nil, err
459 0 : }
460 :
461 : // NB: range-del iterator does not maintain a reference to the table, nor
462 : // does it need to read from it after creation.
463 1 : rangeDelIter, err := ic.NewRawRangeDelIter()
464 1 : if err != nil {
465 1 : c.unrefValue(v)
466 1 : return nil, nil, err
467 1 : }
468 :
469 1 : if !ok {
470 1 : c.unrefValue(v)
471 1 : // Return an empty iterator. This iterator has no mutable state, so
472 1 : // using a singleton is fine.
473 1 : // NB: We still return the potentially non-empty rangeDelIter. This
474 1 : // ensures the iterator observes the file's range deletions even if the
475 1 : // block property filters exclude all the file's point keys. The range
476 1 : // deletions may still delete keys lower in the LSM in files that DO
477 1 : // match the active filters.
478 1 : //
479 1 : // The point iterator returned must implement the filteredIter
480 1 : // interface, so that the level iterator surfaces file boundaries when
481 1 : // range deletions are present.
482 1 : return filteredAll, rangeDelIter, err
483 1 : }
484 :
485 1 : var iter sstable.Iterator
486 1 : useFilter := true
487 1 : if opts != nil {
488 1 : useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
489 1 : ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
490 1 : }
491 1 : tableFormat, err := v.reader.TableFormat()
492 1 : if err != nil {
493 0 : return nil, nil, err
494 0 : }
495 1 : var rp sstable.ReaderProvider
496 1 : if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
497 1 : rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
498 1 : }
499 :
500 1 : if provider.IsSharedForeign(objMeta) {
501 1 : if tableFormat < sstable.TableFormatPebblev4 {
502 0 : return nil, nil, errors.New("pebble: shared foreign sstable has a lower table format than expected")
503 0 : }
504 1 : hideObsoletePoints = true
505 : }
506 1 : if internalOpts.bytesIterated != nil {
507 1 : iter, err = ic.NewCompactionIter(internalOpts.bytesIterated, rp, internalOpts.bufferPool)
508 1 : } else {
509 1 : iter, err = ic.NewIterWithBlockPropertyFiltersAndContextEtc(
510 1 : ctx, opts.GetLowerBound(), opts.GetUpperBound(), filterer, hideObsoletePoints, useFilter,
511 1 : internalOpts.stats, rp)
512 1 : }
513 1 : if err != nil {
514 1 : if rangeDelIter != nil {
515 1 : _ = rangeDelIter.Close()
516 1 : }
517 1 : c.unrefValue(v)
518 1 : return nil, nil, err
519 : }
520 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
521 : // care to avoid introducing an allocation here by adding a closure.
522 1 : iter.SetCloseHook(v.closeHook)
523 1 :
524 1 : c.iterCount.Add(1)
525 1 : dbOpts.iterCount.Add(1)
526 1 : if invariants.RaceEnabled {
527 0 : c.mu.Lock()
528 0 : c.mu.iters[iter] = debug.Stack()
529 0 : c.mu.Unlock()
530 0 : }
531 1 : return iter, rangeDelIter, nil
532 : }
533 :
534 : func (c *tableCacheShard) newRangeKeyIter(
535 : file *manifest.FileMetadata, opts keyspan.SpanIterOptions, dbOpts *tableCacheOpts,
536 1 : ) (keyspan.FragmentIterator, error) {
537 1 : // Calling findNode gives us the responsibility of decrementing v's
538 1 : // refCount. If opening the underlying table resulted in error, then we
539 1 : // decrement this straight away. Otherwise, we pass that responsibility to
540 1 : // the sstable iterator, which decrements when it is closed.
541 1 : v := c.findNode(file, dbOpts)
542 1 : if v.err != nil {
543 0 : defer c.unrefValue(v)
544 0 : return nil, v.err
545 0 : }
546 :
547 1 : ok := true
548 1 : var err error
549 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
550 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
551 1 : // file's range key blocks may surface deleted range keys below. This is
552 1 : // done here, rather than deferring to the block-property collector in order
553 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
554 1 : if v.reader.Properties.NumRangeKeyDels == 0 {
555 1 : ok, _, err = c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil)
556 1 : }
557 1 : if err != nil {
558 0 : c.unrefValue(v)
559 0 : return nil, err
560 0 : }
561 1 : if !ok {
562 0 : c.unrefValue(v)
563 0 : // Return the empty iterator. This iterator has no mutable state, so
564 0 : // using a singleton is fine.
565 0 : return emptyKeyspanIter, err
566 0 : }
567 :
568 1 : var iter keyspan.FragmentIterator
569 1 : if file.Virtual {
570 1 : virtualReader := sstable.MakeVirtualReader(
571 1 : v.reader, file.VirtualMeta(),
572 1 : )
573 1 : iter, err = virtualReader.NewRawRangeKeyIter()
574 1 : } else {
575 1 : iter, err = v.reader.NewRawRangeKeyIter()
576 1 : }
577 :
578 : // iter is a block iter that holds the entire value of the block in memory.
579 : // No need to hold onto a ref of the cache value.
580 1 : c.unrefValue(v)
581 1 :
582 1 : if err != nil {
583 0 : return nil, err
584 0 : }
585 :
586 1 : if iter == nil {
587 1 : // NewRawRangeKeyIter can return nil even if there's no error. However,
588 1 : // the keyspan.LevelIter expects a non-nil iterator if err is nil.
589 1 : return emptyKeyspanIter, nil
590 1 : }
591 :
592 1 : return iter, nil
593 : }
594 :
595 : type tableCacheShardReaderProvider struct {
596 : c *tableCacheShard
597 : file *manifest.FileMetadata
598 : dbOpts *tableCacheOpts
599 : v *tableCacheValue
600 : }
601 :
602 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
603 :
604 : // GetReader implements sstable.ReaderProvider. Note that it is not the
605 : // responsibility of tableCacheShardReaderProvider to ensure that the file
606 : // continues to exist. The ReaderProvider is used in iterators where the
607 : // top-level iterator is pinning the read state and preventing the files from
608 : // being deleted.
609 : //
610 : // The caller must call tableCacheShardReaderProvider.Close.
611 : //
612 : // Note that currently the Reader returned here is only used to read value
613 : // blocks. This reader shouldn't be used for other purposes like reading keys
614 : // outside of virtual sstable bounds.
615 : //
616 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
617 : // that the reader isn't used for other purposes.
618 1 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
619 1 : // Calling findNode gives us the responsibility of decrementing v's
620 1 : // refCount.
621 1 : v := rp.c.findNode(rp.file, rp.dbOpts)
622 1 : if v.err != nil {
623 0 : defer rp.c.unrefValue(v)
624 0 : return nil, v.err
625 0 : }
626 1 : rp.v = v
627 1 : return v.reader, nil
628 : }
629 :
630 : // Close implements sstable.ReaderProvider.
631 1 : func (rp *tableCacheShardReaderProvider) Close() {
632 1 : rp.c.unrefValue(rp.v)
633 1 : rp.v = nil
634 1 : }
635 :
636 : // getTableProperties return sst table properties for target file
637 : func (c *tableCacheShard) getTableProperties(
638 : file *fileMetadata, dbOpts *tableCacheOpts,
639 1 : ) (*sstable.Properties, error) {
640 1 : // Calling findNode gives us the responsibility of decrementing v's refCount here
641 1 : v := c.findNode(file, dbOpts)
642 1 : defer c.unrefValue(v)
643 1 :
644 1 : if v.err != nil {
645 0 : return nil, v.err
646 0 : }
647 1 : return &v.reader.Properties, nil
648 : }
649 :
650 : // releaseNode releases a node from the tableCacheShard.
651 : //
652 : // c.mu must be held when calling this.
653 1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
654 1 : c.unlinkNode(n)
655 1 : c.clearNode(n)
656 1 : }
657 :
658 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
659 : // reference in place.
660 : //
661 : // c.mu must be held when calling this.
662 1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
663 1 : key := tableCacheKey{n.cacheID, n.fileNum}
664 1 : delete(c.mu.nodes, key)
665 1 :
666 1 : switch n.ptype {
667 1 : case tableCacheNodeHot:
668 1 : c.mu.sizeHot--
669 1 : case tableCacheNodeCold:
670 1 : c.mu.sizeCold--
671 1 : case tableCacheNodeTest:
672 1 : c.mu.sizeTest--
673 : }
674 :
675 1 : if n == c.mu.handHot {
676 1 : c.mu.handHot = c.mu.handHot.prev()
677 1 : }
678 1 : if n == c.mu.handCold {
679 1 : c.mu.handCold = c.mu.handCold.prev()
680 1 : }
681 1 : if n == c.mu.handTest {
682 1 : c.mu.handTest = c.mu.handTest.prev()
683 1 : }
684 :
685 1 : if n.unlink() == n {
686 1 : // This was the last entry in the cache.
687 1 : c.mu.handHot = nil
688 1 : c.mu.handCold = nil
689 1 : c.mu.handTest = nil
690 1 : }
691 :
692 1 : n.links.prev = nil
693 1 : n.links.next = nil
694 : }
695 :
696 1 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
697 1 : if v := n.value; v != nil {
698 1 : n.value = nil
699 1 : c.unrefValue(v)
700 1 : }
701 : }
702 :
703 : // unrefValue decrements the reference count for the specified value, releasing
704 : // it if the reference count fell to 0. Note that the value has a reference if
705 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
706 : // the node has already been removed from that map.
707 1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
708 1 : if v.refCount.Add(-1) == 0 {
709 1 : c.releasing.Add(1)
710 1 : c.releasingCh <- v
711 1 : }
712 : }
713 :
714 : // findNode returns the node for the table with the given file number, creating
715 : // that node if it didn't already exist. The caller is responsible for
716 : // decrementing the returned node's refCount.
717 : func (c *tableCacheShard) findNode(
718 : meta *fileMetadata, dbOpts *tableCacheOpts,
719 1 : ) (v *tableCacheValue) {
720 1 : // Loading a file before its global sequence number is known (eg,
721 1 : // during ingest before entering the commit pipeline) can pollute
722 1 : // the cache with incorrect state. In invariant builds, verify
723 1 : // that the global sequence number of the returned reader matches.
724 1 : if invariants.Enabled {
725 1 : defer func() {
726 1 : if v.reader != nil && meta.LargestSeqNum == meta.SmallestSeqNum &&
727 1 : v.reader.Properties.GlobalSeqNum != meta.SmallestSeqNum {
728 0 : panic(errors.AssertionFailedf("file %s loaded from table cache with the wrong global sequence number %d",
729 0 : meta, v.reader.Properties.GlobalSeqNum))
730 : }
731 : }()
732 : }
733 1 : if refs := meta.Refs(); refs <= 0 {
734 0 : panic(errors.AssertionFailedf("attempting to load file %s with refs=%d from table cache",
735 0 : meta, refs))
736 : }
737 :
738 : // Fast-path for a hit in the cache.
739 1 : c.mu.RLock()
740 1 : key := tableCacheKey{dbOpts.cacheID, meta.FileBacking.DiskFileNum}
741 1 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
742 1 : // Fast-path hit.
743 1 : //
744 1 : // The caller is responsible for decrementing the refCount.
745 1 : v = n.value
746 1 : v.refCount.Add(1)
747 1 : c.mu.RUnlock()
748 1 : n.referenced.Store(true)
749 1 : c.hits.Add(1)
750 1 : <-v.loaded
751 1 : return v
752 1 : }
753 1 : c.mu.RUnlock()
754 1 :
755 1 : c.mu.Lock()
756 1 :
757 1 : n := c.mu.nodes[key]
758 1 : switch {
759 1 : case n == nil:
760 1 : // Slow-path miss of a non-existent node.
761 1 : n = &tableCacheNode{
762 1 : fileNum: meta.FileBacking.DiskFileNum,
763 1 : ptype: tableCacheNodeCold,
764 1 : }
765 1 : c.addNode(n, dbOpts)
766 1 : c.mu.sizeCold++
767 :
768 1 : case n.value != nil:
769 1 : // Slow-path hit of a hot or cold node.
770 1 : //
771 1 : // The caller is responsible for decrementing the refCount.
772 1 : v = n.value
773 1 : v.refCount.Add(1)
774 1 : n.referenced.Store(true)
775 1 : c.hits.Add(1)
776 1 : c.mu.Unlock()
777 1 : <-v.loaded
778 1 : return v
779 :
780 1 : default:
781 1 : // Slow-path miss of a test node.
782 1 : c.unlinkNode(n)
783 1 : c.mu.coldTarget++
784 1 : if c.mu.coldTarget > c.size {
785 1 : c.mu.coldTarget = c.size
786 1 : }
787 :
788 1 : n.referenced.Store(false)
789 1 : n.ptype = tableCacheNodeHot
790 1 : c.addNode(n, dbOpts)
791 1 : c.mu.sizeHot++
792 : }
793 :
794 1 : c.misses.Add(1)
795 1 :
796 1 : v = &tableCacheValue{
797 1 : loaded: make(chan struct{}),
798 1 : }
799 1 : v.refCount.Store(2)
800 1 : // Cache the closure invoked when an iterator is closed. This avoids an
801 1 : // allocation on every call to newIters.
802 1 : v.closeHook = func(i sstable.Iterator) error {
803 1 : if invariants.RaceEnabled {
804 0 : c.mu.Lock()
805 0 : delete(c.mu.iters, i)
806 0 : c.mu.Unlock()
807 0 : }
808 1 : c.unrefValue(v)
809 1 : c.iterCount.Add(-1)
810 1 : dbOpts.iterCount.Add(-1)
811 1 : return nil
812 : }
813 1 : n.value = v
814 1 :
815 1 : c.mu.Unlock()
816 1 :
817 1 : // Note adding to the cache lists must complete before we begin loading the
818 1 : // table as a failure during load will result in the node being unlinked.
819 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
820 1 : v.load(
821 1 : loadInfo{
822 1 : backingFileNum: meta.FileBacking.DiskFileNum,
823 1 : smallestSeqNum: meta.SmallestSeqNum,
824 1 : largestSeqNum: meta.LargestSeqNum,
825 1 : }, c, dbOpts)
826 1 : })
827 1 : return v
828 : }
829 :
830 1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
831 1 : c.evictNodes()
832 1 : n.cacheID = dbOpts.cacheID
833 1 : key := tableCacheKey{n.cacheID, n.fileNum}
834 1 : c.mu.nodes[key] = n
835 1 :
836 1 : n.links.next = n
837 1 : n.links.prev = n
838 1 : if c.mu.handHot == nil {
839 1 : // First element.
840 1 : c.mu.handHot = n
841 1 : c.mu.handCold = n
842 1 : c.mu.handTest = n
843 1 : } else {
844 1 : c.mu.handHot.link(n)
845 1 : }
846 :
847 1 : if c.mu.handCold == c.mu.handHot {
848 1 : c.mu.handCold = c.mu.handCold.prev()
849 1 : }
850 : }
851 :
852 1 : func (c *tableCacheShard) evictNodes() {
853 1 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
854 1 : c.runHandCold()
855 1 : }
856 : }
857 :
858 1 : func (c *tableCacheShard) runHandCold() {
859 1 : n := c.mu.handCold
860 1 : if n.ptype == tableCacheNodeCold {
861 1 : if n.referenced.Load() {
862 1 : n.referenced.Store(false)
863 1 : n.ptype = tableCacheNodeHot
864 1 : c.mu.sizeCold--
865 1 : c.mu.sizeHot++
866 1 : } else {
867 1 : c.clearNode(n)
868 1 : n.ptype = tableCacheNodeTest
869 1 : c.mu.sizeCold--
870 1 : c.mu.sizeTest++
871 1 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
872 1 : c.runHandTest()
873 1 : }
874 : }
875 : }
876 :
877 1 : c.mu.handCold = c.mu.handCold.next()
878 1 :
879 1 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
880 1 : c.runHandHot()
881 1 : }
882 : }
883 :
884 1 : func (c *tableCacheShard) runHandHot() {
885 1 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
886 1 : c.runHandTest()
887 1 : if c.mu.handHot == nil {
888 0 : return
889 0 : }
890 : }
891 :
892 1 : n := c.mu.handHot
893 1 : if n.ptype == tableCacheNodeHot {
894 1 : if n.referenced.Load() {
895 1 : n.referenced.Store(false)
896 1 : } else {
897 1 : n.ptype = tableCacheNodeCold
898 1 : c.mu.sizeHot--
899 1 : c.mu.sizeCold++
900 1 : }
901 : }
902 :
903 1 : c.mu.handHot = c.mu.handHot.next()
904 : }
905 :
906 1 : func (c *tableCacheShard) runHandTest() {
907 1 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
908 1 : c.runHandCold()
909 1 : if c.mu.handTest == nil {
910 0 : return
911 0 : }
912 : }
913 :
914 1 : n := c.mu.handTest
915 1 : if n.ptype == tableCacheNodeTest {
916 1 : c.mu.coldTarget--
917 1 : if c.mu.coldTarget < 0 {
918 1 : c.mu.coldTarget = 0
919 1 : }
920 1 : c.unlinkNode(n)
921 1 : c.clearNode(n)
922 : }
923 :
924 1 : c.mu.handTest = c.mu.handTest.next()
925 : }
926 :
927 1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
928 1 : c.mu.Lock()
929 1 : key := tableCacheKey{dbOpts.cacheID, fileNum}
930 1 : n := c.mu.nodes[key]
931 1 : var v *tableCacheValue
932 1 : if n != nil {
933 1 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
934 1 : // the tableCacheNode.release() call synchronously below to ensure the
935 1 : // sstable file descriptor is closed before returning. Note that
936 1 : // tableCacheShard.releasing needs to be incremented while holding
937 1 : // tableCacheShard.mu in order to avoid a race with Close()
938 1 : c.unlinkNode(n)
939 1 : v = n.value
940 1 : if v != nil {
941 1 : if !allowLeak {
942 1 : if t := v.refCount.Add(-1); t != 0 {
943 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
944 0 : }
945 : }
946 1 : c.releasing.Add(1)
947 : }
948 : }
949 :
950 1 : c.mu.Unlock()
951 1 :
952 1 : if v != nil {
953 1 : v.release(c)
954 1 : }
955 :
956 1 : dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
957 : }
958 :
959 : // removeDB evicts any nodes which have a reference to the DB
960 : // associated with dbOpts.cacheID. Make sure that there will
961 : // be no more accesses to the files associated with the DB.
962 1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
963 1 : var fileNums []base.DiskFileNum
964 1 :
965 1 : c.mu.RLock()
966 1 : // Collect the fileNums which need to be cleaned.
967 1 : var firstNode *tableCacheNode
968 1 : node := c.mu.handHot
969 1 : for node != firstNode {
970 1 : if firstNode == nil {
971 1 : firstNode = node
972 1 : }
973 :
974 1 : if node.cacheID == dbOpts.cacheID {
975 1 : fileNums = append(fileNums, node.fileNum)
976 1 : }
977 1 : node = node.next()
978 : }
979 1 : c.mu.RUnlock()
980 1 :
981 1 : // Evict all the nodes associated with the DB.
982 1 : // This should synchronously close all the files
983 1 : // associated with the DB.
984 1 : for _, fileNum := range fileNums {
985 1 : c.evict(fileNum, dbOpts, true)
986 1 : }
987 : }
988 :
989 1 : func (c *tableCacheShard) Close() error {
990 1 : c.mu.Lock()
991 1 : defer c.mu.Unlock()
992 1 :
993 1 : // Check for leaked iterators. Note that we'll still perform cleanup below in
994 1 : // the case that there are leaked iterators.
995 1 : var err error
996 1 : if v := c.iterCount.Load(); v > 0 {
997 1 : if !invariants.RaceEnabled {
998 1 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
999 1 : } else {
1000 0 : var buf bytes.Buffer
1001 0 : for _, stack := range c.mu.iters {
1002 0 : fmt.Fprintf(&buf, "%s\n", stack)
1003 0 : }
1004 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1005 : }
1006 : }
1007 :
1008 1 : for c.mu.handHot != nil {
1009 0 : n := c.mu.handHot
1010 0 : if n.value != nil {
1011 0 : if n.value.refCount.Add(-1) == 0 {
1012 0 : c.releasing.Add(1)
1013 0 : c.releasingCh <- n.value
1014 0 : }
1015 : }
1016 0 : c.unlinkNode(n)
1017 : }
1018 1 : c.mu.nodes = nil
1019 1 : c.mu.handHot = nil
1020 1 : c.mu.handCold = nil
1021 1 : c.mu.handTest = nil
1022 1 :
1023 1 : // Only shutdown the releasing goroutine if there were no leaked
1024 1 : // iterators. If there were leaked iterators, we leave the goroutine running
1025 1 : // and the releasingCh open so that a subsequent iterator close can
1026 1 : // complete. This behavior is used by iterator leak tests. Leaking the
1027 1 : // goroutine for these tests is less bad not closing the iterator which
1028 1 : // triggers other warnings about block cache handles not being released.
1029 1 : if err != nil {
1030 1 : c.releasing.Wait()
1031 1 : return err
1032 1 : }
1033 :
1034 1 : close(c.releasingCh)
1035 1 : c.releasing.Wait()
1036 1 : c.releaseLoopExit.Wait()
1037 1 : return err
1038 : }
1039 :
1040 : type tableCacheValue struct {
1041 : closeHook func(i sstable.Iterator) error
1042 : reader *sstable.Reader
1043 : err error
1044 : loaded chan struct{}
1045 : // Reference count for the value. The reader is closed when the reference
1046 : // count drops to zero.
1047 : refCount atomic.Int32
1048 : }
1049 :
1050 : type loadInfo struct {
1051 : backingFileNum base.DiskFileNum
1052 : largestSeqNum uint64
1053 : smallestSeqNum uint64
1054 : }
1055 :
1056 1 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
1057 1 : // Try opening the file first.
1058 1 : var f objstorage.Readable
1059 1 : var err error
1060 1 : f, err = dbOpts.objProvider.OpenForReading(
1061 1 : context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
1062 1 : )
1063 1 : if err == nil {
1064 1 : cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
1065 1 : v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
1066 1 : }
1067 1 : if err != nil {
1068 1 : v.err = errors.Wrapf(
1069 1 : err, "pebble: backing file %s error", errors.Safe(loadInfo.backingFileNum.FileNum()))
1070 1 : }
1071 1 : if v.err == nil && loadInfo.smallestSeqNum == loadInfo.largestSeqNum {
1072 1 : v.reader.Properties.GlobalSeqNum = loadInfo.largestSeqNum
1073 1 : }
1074 1 : if v.err != nil {
1075 1 : c.mu.Lock()
1076 1 : defer c.mu.Unlock()
1077 1 : // Lookup the node in the cache again as it might have already been
1078 1 : // removed.
1079 1 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
1080 1 : n := c.mu.nodes[key]
1081 1 : if n != nil && n.value == v {
1082 1 : c.releaseNode(n)
1083 1 : }
1084 : }
1085 1 : close(v.loaded)
1086 : }
1087 :
1088 1 : func (v *tableCacheValue) release(c *tableCacheShard) {
1089 1 : <-v.loaded
1090 1 : // Nothing to be done about an error at this point. Close the reader if it is
1091 1 : // open.
1092 1 : if v.reader != nil {
1093 1 : _ = v.reader.Close()
1094 1 : }
1095 1 : c.releasing.Done()
1096 : }
1097 :
1098 : type tableCacheNodeType int8
1099 :
1100 : const (
1101 : tableCacheNodeTest tableCacheNodeType = iota
1102 : tableCacheNodeCold
1103 : tableCacheNodeHot
1104 : )
1105 :
1106 0 : func (p tableCacheNodeType) String() string {
1107 0 : switch p {
1108 0 : case tableCacheNodeTest:
1109 0 : return "test"
1110 0 : case tableCacheNodeCold:
1111 0 : return "cold"
1112 0 : case tableCacheNodeHot:
1113 0 : return "hot"
1114 : }
1115 0 : return "unknown"
1116 : }
1117 :
1118 : type tableCacheNode struct {
1119 : fileNum base.DiskFileNum
1120 : value *tableCacheValue
1121 :
1122 : links struct {
1123 : next *tableCacheNode
1124 : prev *tableCacheNode
1125 : }
1126 : ptype tableCacheNodeType
1127 : // referenced is atomically set to indicate that this entry has been accessed
1128 : // since the last time one of the clock hands swept it.
1129 : referenced atomic.Bool
1130 :
1131 : // Storing the cache id associated with the DB instance here
1132 : // avoids the need to thread the dbOpts struct through many functions.
1133 : cacheID uint64
1134 : }
1135 :
1136 1 : func (n *tableCacheNode) next() *tableCacheNode {
1137 1 : if n == nil {
1138 0 : return nil
1139 0 : }
1140 1 : return n.links.next
1141 : }
1142 :
1143 1 : func (n *tableCacheNode) prev() *tableCacheNode {
1144 1 : if n == nil {
1145 0 : return nil
1146 0 : }
1147 1 : return n.links.prev
1148 : }
1149 :
1150 1 : func (n *tableCacheNode) link(s *tableCacheNode) {
1151 1 : s.links.prev = n.links.prev
1152 1 : s.links.prev.links.next = s
1153 1 : s.links.next = n
1154 1 : s.links.next.links.prev = s
1155 1 : }
1156 :
1157 1 : func (n *tableCacheNode) unlink() *tableCacheNode {
1158 1 : next := n.links.next
1159 1 : n.links.prev.links.next = n.links.next
1160 1 : n.links.next.links.prev = n.links.prev
1161 1 : n.links.prev = n
1162 1 : n.links.next = n
1163 1 : return next
1164 1 : }
|