Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "io"
12 : "runtime/debug"
13 : "runtime/pprof"
14 : "sync"
15 : "sync/atomic"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/manifest"
23 : "github.com/cockroachdb/pebble/internal/private"
24 : "github.com/cockroachdb/pebble/objstorage"
25 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
26 : "github.com/cockroachdb/pebble/sstable"
27 : )
28 :
29 : var emptyIter = &errorIter{err: nil}
30 : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
31 :
32 : // filteredAll is a singleton internalIterator implementation used when an
33 : // sstable does contain point keys, but all the keys are filtered by the active
34 : // PointKeyFilters set in the iterator's IterOptions.
35 : //
36 : // filteredAll implements filteredIter, ensuring the level iterator recognizes
37 : // when it may need to return file boundaries to keep the rangeDelIter open
38 : // during mergingIter operation.
39 : var filteredAll = &filteredAllKeysIter{errorIter: errorIter{err: nil}}
40 :
41 : var _ filteredIter = filteredAll
42 :
43 : type filteredAllKeysIter struct {
44 : errorIter
45 : }
46 :
47 1 : func (s *filteredAllKeysIter) MaybeFilteredKeys() bool {
48 1 : return true
49 1 : }
50 :
51 : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
52 :
53 : // tableCacheOpts contains the db specific fields
54 : // of a table cache. This is stored in the tableCacheContainer
55 : // along with the table cache.
56 : // NB: It is important to make sure that the fields in this
57 : // struct are read-only. Since the fields here are shared
58 : // by every single tableCacheShard, if non read-only fields
59 : // are updated, we could have unnecessary evictions of those
60 : // fields, and the surrounding fields from the CPU caches.
61 : type tableCacheOpts struct {
62 : // iterCount keeps track of how many iterators are open. It is used to keep
63 : // track of leaked iterators on a per-db level.
64 : iterCount *atomic.Int32
65 :
66 : loggerAndTracer LoggerAndTracer
67 : cacheID uint64
68 : objProvider objstorage.Provider
69 : opts sstable.ReaderOptions
70 : filterMetrics *sstable.FilterMetricsTracker
71 : sstStatsCollector *sstable.CategoryStatsCollector
72 : }
73 :
74 : // tableCacheContainer contains the table cache and
75 : // fields which are unique to the DB.
76 : type tableCacheContainer struct {
77 : tableCache *TableCache
78 :
79 : // dbOpts contains fields relevant to the table cache
80 : // which are unique to each DB.
81 : dbOpts tableCacheOpts
82 : }
83 :
84 : // newTableCacheContainer will panic if the underlying cache in the table cache
85 : // doesn't match Options.Cache.
86 : func newTableCacheContainer(
87 : tc *TableCache,
88 : cacheID uint64,
89 : objProvider objstorage.Provider,
90 : opts *Options,
91 : size int,
92 : sstStatsCollector *sstable.CategoryStatsCollector,
93 1 : ) *tableCacheContainer {
94 1 : // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
95 1 : if tc != nil {
96 0 : if tc.cache != opts.Cache {
97 0 : panic("pebble: underlying cache for the table cache and db are different")
98 : }
99 0 : tc.Ref()
100 1 : } else {
101 1 : // NewTableCache should create a ref to tc which the container should
102 1 : // drop whenever it is closed.
103 1 : tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
104 1 : }
105 :
106 1 : t := &tableCacheContainer{}
107 1 : t.tableCache = tc
108 1 : t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
109 1 : t.dbOpts.cacheID = cacheID
110 1 : t.dbOpts.objProvider = objProvider
111 1 : t.dbOpts.opts = opts.MakeReaderOptions()
112 1 : t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
113 1 : t.dbOpts.iterCount = new(atomic.Int32)
114 1 : t.dbOpts.sstStatsCollector = sstStatsCollector
115 1 : return t
116 : }
117 :
118 : // Before calling close, make sure that there will be no further need
119 : // to access any of the files associated with the store.
120 1 : func (c *tableCacheContainer) close() error {
121 1 : // We want to do some cleanup work here. Check for leaked iterators
122 1 : // by the DB using this container. Note that we'll still perform cleanup
123 1 : // below in the case that there are leaked iterators.
124 1 : var err error
125 1 : if v := c.dbOpts.iterCount.Load(); v > 0 {
126 0 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
127 0 : }
128 :
129 : // Release nodes here.
130 1 : for _, shard := range c.tableCache.shards {
131 1 : if shard != nil {
132 1 : shard.removeDB(&c.dbOpts)
133 1 : }
134 : }
135 1 : return firstError(err, c.tableCache.Unref())
136 : }
137 :
138 : func (c *tableCacheContainer) newIters(
139 : ctx context.Context,
140 : file *manifest.FileMetadata,
141 : opts *IterOptions,
142 : internalOpts internalIterOpts,
143 1 : ) (internalIterator, keyspan.FragmentIterator, error) {
144 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts)
145 1 : }
146 :
147 : func (c *tableCacheContainer) newRangeKeyIter(
148 : file *manifest.FileMetadata, opts keyspan.SpanIterOptions,
149 1 : ) (keyspan.FragmentIterator, error) {
150 1 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).newRangeKeyIter(file, opts, &c.dbOpts)
151 1 : }
152 :
153 : // getTableProperties returns the properties associated with the backing physical
154 : // table if the input metadata belongs to a virtual sstable.
155 0 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
156 0 : return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
157 0 : }
158 :
159 1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
160 1 : c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
161 1 : }
162 :
163 1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
164 1 : var m CacheMetrics
165 1 : for i := range c.tableCache.shards {
166 1 : s := c.tableCache.shards[i]
167 1 : s.mu.RLock()
168 1 : m.Count += int64(len(s.mu.nodes))
169 1 : s.mu.RUnlock()
170 1 : m.Hits += s.hits.Load()
171 1 : m.Misses += s.misses.Load()
172 1 : }
173 1 : m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
174 1 : f := c.dbOpts.filterMetrics.Load()
175 1 : return m, f
176 : }
177 :
178 : func (c *tableCacheContainer) estimateSize(
179 : meta *fileMetadata, lower, upper []byte,
180 1 : ) (size uint64, err error) {
181 1 : if meta.Virtual {
182 1 : err = c.withVirtualReader(
183 1 : meta.VirtualMeta(),
184 1 : func(r sstable.VirtualReader) (err error) {
185 1 : size, err = r.EstimateDiskUsage(lower, upper)
186 1 : return err
187 1 : },
188 : )
189 1 : } else {
190 1 : err = c.withReader(
191 1 : meta.PhysicalMeta(),
192 1 : func(r *sstable.Reader) (err error) {
193 1 : size, err = r.EstimateDiskUsage(lower, upper)
194 1 : return err
195 1 : },
196 : )
197 : }
198 1 : if err != nil {
199 0 : return 0, err
200 0 : }
201 1 : return size, nil
202 : }
203 :
204 : // createCommonReader creates a Reader for this file. isForeign, if true for
205 : // virtual sstables, is passed into the vSSTable reader so its iterators can
206 : // collapse obsolete points accordingly.
207 : func createCommonReader(
208 : v *tableCacheValue, file *fileMetadata, isForeign bool,
209 1 : ) sstable.CommonReader {
210 1 : // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
211 1 : var cr sstable.CommonReader = v.reader
212 1 : if file.Virtual {
213 1 : virtualReader := sstable.MakeVirtualReader(
214 1 : v.reader, file.VirtualMeta(), isForeign,
215 1 : )
216 1 : cr = &virtualReader
217 1 : }
218 1 : return cr
219 : }
220 :
221 : func (c *tableCacheContainer) withCommonReader(
222 : meta *fileMetadata, fn func(sstable.CommonReader) error,
223 1 : ) error {
224 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
225 1 : v := s.findNode(meta, &c.dbOpts)
226 1 : defer s.unrefValue(v)
227 1 : if v.err != nil {
228 0 : return v.err
229 0 : }
230 1 : provider := c.dbOpts.objProvider
231 1 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
232 1 : if err != nil {
233 0 : return err
234 0 : }
235 1 : return fn(createCommonReader(v, meta, provider.IsSharedForeign(objMeta)))
236 : }
237 :
238 1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
239 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
240 1 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
241 1 : defer s.unrefValue(v)
242 1 : if v.err != nil {
243 0 : return v.err
244 0 : }
245 1 : return fn(v.reader)
246 : }
247 :
248 : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
249 : func (c *tableCacheContainer) withVirtualReader(
250 : meta virtualMeta, fn func(sstable.VirtualReader) error,
251 1 : ) error {
252 1 : s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
253 1 : v := s.findNode(meta.FileMetadata, &c.dbOpts)
254 1 : defer s.unrefValue(v)
255 1 : if v.err != nil {
256 0 : return v.err
257 0 : }
258 1 : provider := c.dbOpts.objProvider
259 1 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
260 1 : if err != nil {
261 0 : return err
262 0 : }
263 1 : return fn(sstable.MakeVirtualReader(v.reader, meta, provider.IsSharedForeign(objMeta)))
264 : }
265 :
266 1 : func (c *tableCacheContainer) iterCount() int64 {
267 1 : return int64(c.dbOpts.iterCount.Load())
268 1 : }
269 :
270 : // TableCache is a shareable cache for open sstables.
271 : type TableCache struct {
272 : refs atomic.Int64
273 :
274 : cache *Cache
275 : shards []*tableCacheShard
276 : }
277 :
278 : // Ref adds a reference to the table cache. Once tableCache.init returns,
279 : // the table cache only remains valid if there is at least one reference
280 : // to it.
281 0 : func (c *TableCache) Ref() {
282 0 : v := c.refs.Add(1)
283 0 : // We don't want the reference count to ever go from 0 -> 1,
284 0 : // cause a reference count of 0 implies that we've closed the cache.
285 0 : if v <= 1 {
286 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
287 : }
288 : }
289 :
290 : // Unref removes a reference to the table cache.
291 1 : func (c *TableCache) Unref() error {
292 1 : v := c.refs.Add(-1)
293 1 : switch {
294 0 : case v < 0:
295 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
296 1 : case v == 0:
297 1 : var err error
298 1 : for i := range c.shards {
299 1 : // The cache shard is not allocated yet, nothing to close
300 1 : if c.shards[i] == nil {
301 0 : continue
302 : }
303 1 : err = firstError(err, c.shards[i].Close())
304 : }
305 :
306 : // Unref the cache which we create a reference to when the tableCache
307 : // is first instantiated.
308 1 : c.cache.Unref()
309 1 : return err
310 : }
311 0 : return nil
312 : }
313 :
314 : // NewTableCache will create a reference to the table cache. It is the callers responsibility
315 : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
316 1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
317 1 : if size == 0 {
318 0 : panic("pebble: cannot create a table cache of size 0")
319 1 : } else if numShards == 0 {
320 0 : panic("pebble: cannot create a table cache with 0 shards")
321 : }
322 :
323 1 : c := &TableCache{}
324 1 : c.cache = cache
325 1 : c.cache.Ref()
326 1 :
327 1 : c.shards = make([]*tableCacheShard, numShards)
328 1 : for i := range c.shards {
329 1 : c.shards[i] = &tableCacheShard{}
330 1 : c.shards[i].init(size / len(c.shards))
331 1 : }
332 :
333 : // Hold a ref to the cache here.
334 1 : c.refs.Store(1)
335 1 :
336 1 : return c
337 : }
338 :
339 1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
340 1 : return c.shards[uint64(fileNum.FileNum())%uint64(len(c.shards))]
341 1 : }
342 :
343 : type tableCacheKey struct {
344 : cacheID uint64
345 : fileNum base.DiskFileNum
346 : }
347 :
348 : type tableCacheShard struct {
349 : hits atomic.Int64
350 : misses atomic.Int64
351 : iterCount atomic.Int32
352 :
353 : size int
354 :
355 : mu struct {
356 : sync.RWMutex
357 : nodes map[tableCacheKey]*tableCacheNode
358 : // The iters map is only created and populated in race builds.
359 : iters map[io.Closer][]byte
360 :
361 : handHot *tableCacheNode
362 : handCold *tableCacheNode
363 : handTest *tableCacheNode
364 :
365 : coldTarget int
366 : sizeHot int
367 : sizeCold int
368 : sizeTest int
369 : }
370 : releasing sync.WaitGroup
371 : releasingCh chan *tableCacheValue
372 : releaseLoopExit sync.WaitGroup
373 : }
374 :
375 1 : func (c *tableCacheShard) init(size int) {
376 1 : c.size = size
377 1 :
378 1 : c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
379 1 : c.mu.coldTarget = size
380 1 : c.releasingCh = make(chan *tableCacheValue, 100)
381 1 : c.releaseLoopExit.Add(1)
382 1 : go c.releaseLoop()
383 1 :
384 1 : if invariants.RaceEnabled {
385 0 : c.mu.iters = make(map[io.Closer][]byte)
386 0 : }
387 : }
388 :
389 1 : func (c *tableCacheShard) releaseLoop() {
390 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
391 1 : defer c.releaseLoopExit.Done()
392 1 : for v := range c.releasingCh {
393 0 : v.release(c)
394 0 : }
395 : })
396 : }
397 :
398 : // checkAndIntersectFilters checks the specific table and block property filters
399 : // for intersection with any available table and block-level properties. Returns
400 : // true for ok if this table should be read by this iterator.
401 : func (c *tableCacheShard) checkAndIntersectFilters(
402 : v *tableCacheValue,
403 : tableFilter func(userProps map[string]string) bool,
404 : blockPropertyFilters []BlockPropertyFilter,
405 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
406 1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
407 1 : if tableFilter != nil &&
408 1 : !tableFilter(v.reader.Properties.UserProperties) {
409 0 : return false, nil, nil
410 0 : }
411 :
412 1 : if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
413 1 : filterer, err = sstable.IntersectsTable(
414 1 : blockPropertyFilters,
415 1 : boundLimitedFilter,
416 1 : v.reader.Properties.UserProperties,
417 1 : )
418 1 : // NB: IntersectsTable will return a nil filterer if the table-level
419 1 : // properties indicate there's no intersection with the provided filters.
420 1 : if filterer == nil || err != nil {
421 1 : return false, nil, err
422 1 : }
423 : }
424 1 : return true, filterer, nil
425 : }
426 :
427 : func (c *tableCacheShard) newIters(
428 : ctx context.Context,
429 : file *manifest.FileMetadata,
430 : opts *IterOptions,
431 : internalOpts internalIterOpts,
432 : dbOpts *tableCacheOpts,
433 1 : ) (internalIterator, keyspan.FragmentIterator, error) {
434 1 : // TODO(sumeer): constructing the Reader should also use a plumbed context,
435 1 : // since parts of the sstable are read during the construction. The Reader
436 1 : // should not remember that context since the Reader can be long-lived.
437 1 :
438 1 : // Calling findNode gives us the responsibility of decrementing v's
439 1 : // refCount. If opening the underlying table resulted in error, then we
440 1 : // decrement this straight away. Otherwise, we pass that responsibility to
441 1 : // the sstable iterator, which decrements when it is closed.
442 1 : v := c.findNode(file, dbOpts)
443 1 : if v.err != nil {
444 0 : defer c.unrefValue(v)
445 0 : return nil, nil, v.err
446 0 : }
447 :
448 1 : hideObsoletePoints := false
449 1 : var pointKeyFilters []BlockPropertyFilter
450 1 : if opts != nil {
451 1 : // This code is appending (at most one filter) in-place to
452 1 : // opts.PointKeyFilters even though the slice is shared for iterators in
453 1 : // the same iterator tree. This is acceptable since all the following
454 1 : // properties are true:
455 1 : // - The iterator tree is single threaded, so the shared backing for the
456 1 : // slice is being mutated in a single threaded manner.
457 1 : // - Each shallow copy of the slice has its own notion of length.
458 1 : // - The appended element is always the obsoleteKeyBlockPropertyFilter
459 1 : // struct, which is stateless, so overwriting that struct when creating
460 1 : // one sstable iterator is harmless to other sstable iterators that are
461 1 : // relying on that struct.
462 1 : //
463 1 : // An alternative would be to have different slices for different sstable
464 1 : // iterators, but that requires more work to avoid allocations.
465 1 : hideObsoletePoints, pointKeyFilters =
466 1 : v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
467 1 : opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
468 1 : }
469 1 : ok := true
470 1 : var filterer *sstable.BlockPropertiesFilterer
471 1 : var err error
472 1 : if opts != nil {
473 1 : ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
474 1 : pointKeyFilters, internalOpts.boundLimitedFilter)
475 1 : }
476 1 : if err != nil {
477 0 : c.unrefValue(v)
478 0 : return nil, nil, err
479 0 : }
480 :
481 1 : provider := dbOpts.objProvider
482 1 : // Check if this file is a foreign file.
483 1 : objMeta, err := provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
484 1 : if err != nil {
485 0 : return nil, nil, err
486 0 : }
487 :
488 : // Note: This suffers an allocation for virtual sstables.
489 1 : cr := createCommonReader(v, file, provider.IsSharedForeign(objMeta))
490 1 :
491 1 : // NB: range-del iterator does not maintain a reference to the table, nor
492 1 : // does it need to read from it after creation.
493 1 : rangeDelIter, err := cr.NewRawRangeDelIter()
494 1 : if err != nil {
495 0 : c.unrefValue(v)
496 0 : return nil, nil, err
497 0 : }
498 :
499 1 : if !ok {
500 1 : c.unrefValue(v)
501 1 : // Return an empty iterator. This iterator has no mutable state, so
502 1 : // using a singleton is fine.
503 1 : // NB: We still return the potentially non-empty rangeDelIter. This
504 1 : // ensures the iterator observes the file's range deletions even if the
505 1 : // block property filters exclude all the file's point keys. The range
506 1 : // deletions may still delete keys lower in the LSM in files that DO
507 1 : // match the active filters.
508 1 : //
509 1 : // The point iterator returned must implement the filteredIter
510 1 : // interface, so that the level iterator surfaces file boundaries when
511 1 : // range deletions are present.
512 1 : return filteredAll, rangeDelIter, err
513 1 : }
514 :
515 1 : var iter sstable.Iterator
516 1 : useFilter := true
517 1 : if opts != nil {
518 1 : useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
519 1 : ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
520 1 : }
521 1 : tableFormat, err := v.reader.TableFormat()
522 1 : if err != nil {
523 0 : return nil, nil, err
524 0 : }
525 1 : var rp sstable.ReaderProvider
526 1 : if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
527 1 : rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
528 1 : }
529 :
530 1 : if provider.IsSharedForeign(objMeta) {
531 0 : if tableFormat < sstable.TableFormatPebblev4 {
532 0 : return nil, nil, errors.New("pebble: shared foreign sstable has a lower table format than expected")
533 0 : }
534 0 : hideObsoletePoints = true
535 : }
536 1 : var categoryAndQoS sstable.CategoryAndQoS
537 1 : if opts != nil {
538 1 : categoryAndQoS = opts.CategoryAndQoS
539 1 : }
540 1 : if internalOpts.bytesIterated != nil {
541 1 : iter, err = cr.NewCompactionIter(
542 1 : internalOpts.bytesIterated, categoryAndQoS, dbOpts.sstStatsCollector, rp,
543 1 : internalOpts.bufferPool)
544 1 : } else {
545 1 : iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
546 1 : ctx, opts.GetLowerBound(), opts.GetUpperBound(), filterer, hideObsoletePoints, useFilter,
547 1 : internalOpts.stats, categoryAndQoS, dbOpts.sstStatsCollector, rp)
548 1 : }
549 1 : if err != nil {
550 0 : if rangeDelIter != nil {
551 0 : _ = rangeDelIter.Close()
552 0 : }
553 0 : c.unrefValue(v)
554 0 : return nil, nil, err
555 : }
556 : // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
557 : // care to avoid introducing an allocation here by adding a closure.
558 1 : iter.SetCloseHook(v.closeHook)
559 1 :
560 1 : c.iterCount.Add(1)
561 1 : dbOpts.iterCount.Add(1)
562 1 : if invariants.RaceEnabled {
563 0 : c.mu.Lock()
564 0 : c.mu.iters[iter] = debug.Stack()
565 0 : c.mu.Unlock()
566 0 : }
567 1 : return iter, rangeDelIter, nil
568 : }
569 :
570 : func (c *tableCacheShard) newRangeKeyIter(
571 : file *manifest.FileMetadata, opts keyspan.SpanIterOptions, dbOpts *tableCacheOpts,
572 1 : ) (keyspan.FragmentIterator, error) {
573 1 : // Calling findNode gives us the responsibility of decrementing v's
574 1 : // refCount. If opening the underlying table resulted in error, then we
575 1 : // decrement this straight away. Otherwise, we pass that responsibility to
576 1 : // the sstable iterator, which decrements when it is closed.
577 1 : v := c.findNode(file, dbOpts)
578 1 : if v.err != nil {
579 0 : defer c.unrefValue(v)
580 0 : return nil, v.err
581 0 : }
582 :
583 1 : ok := true
584 1 : var err error
585 1 : // Don't filter a table's range keys if the file contains RANGEKEYDELs.
586 1 : // The RANGEKEYDELs may delete range keys in other levels. Skipping the
587 1 : // file's range key blocks may surface deleted range keys below. This is
588 1 : // done here, rather than deferring to the block-property collector in order
589 1 : // to maintain parity with point keys and the treatment of RANGEDELs.
590 1 : if v.reader.Properties.NumRangeKeyDels == 0 {
591 1 : ok, _, err = c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil)
592 1 : }
593 1 : if err != nil {
594 0 : c.unrefValue(v)
595 0 : return nil, err
596 0 : }
597 1 : if !ok {
598 0 : c.unrefValue(v)
599 0 : // Return the empty iterator. This iterator has no mutable state, so
600 0 : // using a singleton is fine.
601 0 : return emptyKeyspanIter, err
602 0 : }
603 :
604 1 : var iter keyspan.FragmentIterator
605 1 : if file.Virtual {
606 1 : provider := dbOpts.objProvider
607 1 : var objMeta objstorage.ObjectMetadata
608 1 : objMeta, err = provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
609 1 : if err == nil {
610 1 : virtualReader := sstable.MakeVirtualReader(
611 1 : v.reader, file.VirtualMeta(), provider.IsSharedForeign(objMeta),
612 1 : )
613 1 : iter, err = virtualReader.NewRawRangeKeyIter()
614 1 : }
615 1 : } else {
616 1 : iter, err = v.reader.NewRawRangeKeyIter()
617 1 : }
618 :
619 : // iter is a block iter that holds the entire value of the block in memory.
620 : // No need to hold onto a ref of the cache value.
621 1 : c.unrefValue(v)
622 1 :
623 1 : if err != nil {
624 0 : return nil, err
625 0 : }
626 :
627 1 : if iter == nil {
628 0 : // NewRawRangeKeyIter can return nil even if there's no error. However,
629 0 : // the keyspan.LevelIter expects a non-nil iterator if err is nil.
630 0 : return emptyKeyspanIter, nil
631 0 : }
632 :
633 1 : return iter, nil
634 : }
635 :
636 : type tableCacheShardReaderProvider struct {
637 : c *tableCacheShard
638 : file *manifest.FileMetadata
639 : dbOpts *tableCacheOpts
640 : v *tableCacheValue
641 : }
642 :
643 : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
644 :
645 : // GetReader implements sstable.ReaderProvider. Note that it is not the
646 : // responsibility of tableCacheShardReaderProvider to ensure that the file
647 : // continues to exist. The ReaderProvider is used in iterators where the
648 : // top-level iterator is pinning the read state and preventing the files from
649 : // being deleted.
650 : //
651 : // The caller must call tableCacheShardReaderProvider.Close.
652 : //
653 : // Note that currently the Reader returned here is only used to read value
654 : // blocks. This reader shouldn't be used for other purposes like reading keys
655 : // outside of virtual sstable bounds.
656 : //
657 : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
658 : // that the reader isn't used for other purposes.
659 1 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
660 1 : // Calling findNode gives us the responsibility of decrementing v's
661 1 : // refCount.
662 1 : v := rp.c.findNode(rp.file, rp.dbOpts)
663 1 : if v.err != nil {
664 0 : defer rp.c.unrefValue(v)
665 0 : return nil, v.err
666 0 : }
667 1 : rp.v = v
668 1 : return v.reader, nil
669 : }
670 :
671 : // Close implements sstable.ReaderProvider.
672 1 : func (rp *tableCacheShardReaderProvider) Close() {
673 1 : rp.c.unrefValue(rp.v)
674 1 : rp.v = nil
675 1 : }
676 :
677 : // getTableProperties return sst table properties for target file
678 : func (c *tableCacheShard) getTableProperties(
679 : file *fileMetadata, dbOpts *tableCacheOpts,
680 0 : ) (*sstable.Properties, error) {
681 0 : // Calling findNode gives us the responsibility of decrementing v's refCount here
682 0 : v := c.findNode(file, dbOpts)
683 0 : defer c.unrefValue(v)
684 0 :
685 0 : if v.err != nil {
686 0 : return nil, v.err
687 0 : }
688 0 : return &v.reader.Properties, nil
689 : }
690 :
691 : // releaseNode releases a node from the tableCacheShard.
692 : //
693 : // c.mu must be held when calling this.
694 0 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
695 0 : c.unlinkNode(n)
696 0 : c.clearNode(n)
697 0 : }
698 :
699 : // unlinkNode removes a node from the tableCacheShard, leaving the shard
700 : // reference in place.
701 : //
702 : // c.mu must be held when calling this.
703 1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
704 1 : key := tableCacheKey{n.cacheID, n.fileNum}
705 1 : delete(c.mu.nodes, key)
706 1 :
707 1 : switch n.ptype {
708 0 : case tableCacheNodeHot:
709 0 : c.mu.sizeHot--
710 1 : case tableCacheNodeCold:
711 1 : c.mu.sizeCold--
712 0 : case tableCacheNodeTest:
713 0 : c.mu.sizeTest--
714 : }
715 :
716 1 : if n == c.mu.handHot {
717 1 : c.mu.handHot = c.mu.handHot.prev()
718 1 : }
719 1 : if n == c.mu.handCold {
720 1 : c.mu.handCold = c.mu.handCold.prev()
721 1 : }
722 1 : if n == c.mu.handTest {
723 1 : c.mu.handTest = c.mu.handTest.prev()
724 1 : }
725 :
726 1 : if n.unlink() == n {
727 1 : // This was the last entry in the cache.
728 1 : c.mu.handHot = nil
729 1 : c.mu.handCold = nil
730 1 : c.mu.handTest = nil
731 1 : }
732 :
733 1 : n.links.prev = nil
734 1 : n.links.next = nil
735 : }
736 :
737 0 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
738 0 : if v := n.value; v != nil {
739 0 : n.value = nil
740 0 : c.unrefValue(v)
741 0 : }
742 : }
743 :
744 : // unrefValue decrements the reference count for the specified value, releasing
745 : // it if the reference count fell to 0. Note that the value has a reference if
746 : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
747 : // the node has already been removed from that map.
748 1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
749 1 : if v.refCount.Add(-1) == 0 {
750 0 : c.releasing.Add(1)
751 0 : c.releasingCh <- v
752 0 : }
753 : }
754 :
755 : // findNode returns the node for the table with the given file number, creating
756 : // that node if it didn't already exist. The caller is responsible for
757 : // decrementing the returned node's refCount.
758 : func (c *tableCacheShard) findNode(
759 : meta *fileMetadata, dbOpts *tableCacheOpts,
760 1 : ) (v *tableCacheValue) {
761 1 : // Loading a file before its global sequence number is known (eg,
762 1 : // during ingest before entering the commit pipeline) can pollute
763 1 : // the cache with incorrect state. In invariant builds, verify
764 1 : // that the global sequence number of the returned reader matches.
765 1 : if invariants.Enabled {
766 1 : defer func() {
767 1 : if v.reader != nil && meta.LargestSeqNum == meta.SmallestSeqNum &&
768 1 : v.reader.Properties.GlobalSeqNum != meta.SmallestSeqNum {
769 0 : panic(errors.AssertionFailedf("file %s loaded from table cache with the wrong global sequence number %d",
770 0 : meta, v.reader.Properties.GlobalSeqNum))
771 : }
772 : }()
773 : }
774 1 : if refs := meta.Refs(); refs <= 0 {
775 0 : panic(errors.AssertionFailedf("attempting to load file %s with refs=%d from table cache",
776 0 : meta, refs))
777 : }
778 :
779 : // Fast-path for a hit in the cache.
780 1 : c.mu.RLock()
781 1 : key := tableCacheKey{dbOpts.cacheID, meta.FileBacking.DiskFileNum}
782 1 : if n := c.mu.nodes[key]; n != nil && n.value != nil {
783 1 : // Fast-path hit.
784 1 : //
785 1 : // The caller is responsible for decrementing the refCount.
786 1 : v = n.value
787 1 : v.refCount.Add(1)
788 1 : c.mu.RUnlock()
789 1 : n.referenced.Store(true)
790 1 : c.hits.Add(1)
791 1 : <-v.loaded
792 1 : return v
793 1 : }
794 1 : c.mu.RUnlock()
795 1 :
796 1 : c.mu.Lock()
797 1 :
798 1 : n := c.mu.nodes[key]
799 1 : switch {
800 1 : case n == nil:
801 1 : // Slow-path miss of a non-existent node.
802 1 : n = &tableCacheNode{
803 1 : fileNum: meta.FileBacking.DiskFileNum,
804 1 : ptype: tableCacheNodeCold,
805 1 : }
806 1 : c.addNode(n, dbOpts)
807 1 : c.mu.sizeCold++
808 :
809 1 : case n.value != nil:
810 1 : // Slow-path hit of a hot or cold node.
811 1 : //
812 1 : // The caller is responsible for decrementing the refCount.
813 1 : v = n.value
814 1 : v.refCount.Add(1)
815 1 : n.referenced.Store(true)
816 1 : c.hits.Add(1)
817 1 : c.mu.Unlock()
818 1 : <-v.loaded
819 1 : return v
820 :
821 0 : default:
822 0 : // Slow-path miss of a test node.
823 0 : c.unlinkNode(n)
824 0 : c.mu.coldTarget++
825 0 : if c.mu.coldTarget > c.size {
826 0 : c.mu.coldTarget = c.size
827 0 : }
828 :
829 0 : n.referenced.Store(false)
830 0 : n.ptype = tableCacheNodeHot
831 0 : c.addNode(n, dbOpts)
832 0 : c.mu.sizeHot++
833 : }
834 :
835 1 : c.misses.Add(1)
836 1 :
837 1 : v = &tableCacheValue{
838 1 : loaded: make(chan struct{}),
839 1 : }
840 1 : v.refCount.Store(2)
841 1 : // Cache the closure invoked when an iterator is closed. This avoids an
842 1 : // allocation on every call to newIters.
843 1 : v.closeHook = func(i sstable.Iterator) error {
844 1 : if invariants.RaceEnabled {
845 0 : c.mu.Lock()
846 0 : delete(c.mu.iters, i)
847 0 : c.mu.Unlock()
848 0 : }
849 1 : c.unrefValue(v)
850 1 : c.iterCount.Add(-1)
851 1 : dbOpts.iterCount.Add(-1)
852 1 : return nil
853 : }
854 1 : n.value = v
855 1 :
856 1 : c.mu.Unlock()
857 1 :
858 1 : // Note adding to the cache lists must complete before we begin loading the
859 1 : // table as a failure during load will result in the node being unlinked.
860 1 : pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
861 1 : v.load(
862 1 : loadInfo{
863 1 : backingFileNum: meta.FileBacking.DiskFileNum,
864 1 : smallestSeqNum: meta.SmallestSeqNum,
865 1 : largestSeqNum: meta.LargestSeqNum,
866 1 : }, c, dbOpts)
867 1 : })
868 1 : return v
869 : }
870 :
871 1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
872 1 : c.evictNodes()
873 1 : n.cacheID = dbOpts.cacheID
874 1 : key := tableCacheKey{n.cacheID, n.fileNum}
875 1 : c.mu.nodes[key] = n
876 1 :
877 1 : n.links.next = n
878 1 : n.links.prev = n
879 1 : if c.mu.handHot == nil {
880 1 : // First element.
881 1 : c.mu.handHot = n
882 1 : c.mu.handCold = n
883 1 : c.mu.handTest = n
884 1 : } else {
885 1 : c.mu.handHot.link(n)
886 1 : }
887 :
888 1 : if c.mu.handCold == c.mu.handHot {
889 1 : c.mu.handCold = c.mu.handCold.prev()
890 1 : }
891 : }
892 :
893 1 : func (c *tableCacheShard) evictNodes() {
894 1 : for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
895 0 : c.runHandCold()
896 0 : }
897 : }
898 :
899 0 : func (c *tableCacheShard) runHandCold() {
900 0 : n := c.mu.handCold
901 0 : if n.ptype == tableCacheNodeCold {
902 0 : if n.referenced.Load() {
903 0 : n.referenced.Store(false)
904 0 : n.ptype = tableCacheNodeHot
905 0 : c.mu.sizeCold--
906 0 : c.mu.sizeHot++
907 0 : } else {
908 0 : c.clearNode(n)
909 0 : n.ptype = tableCacheNodeTest
910 0 : c.mu.sizeCold--
911 0 : c.mu.sizeTest++
912 0 : for c.size < c.mu.sizeTest && c.mu.handTest != nil {
913 0 : c.runHandTest()
914 0 : }
915 : }
916 : }
917 :
918 0 : c.mu.handCold = c.mu.handCold.next()
919 0 :
920 0 : for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
921 0 : c.runHandHot()
922 0 : }
923 : }
924 :
925 0 : func (c *tableCacheShard) runHandHot() {
926 0 : if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
927 0 : c.runHandTest()
928 0 : if c.mu.handHot == nil {
929 0 : return
930 0 : }
931 : }
932 :
933 0 : n := c.mu.handHot
934 0 : if n.ptype == tableCacheNodeHot {
935 0 : if n.referenced.Load() {
936 0 : n.referenced.Store(false)
937 0 : } else {
938 0 : n.ptype = tableCacheNodeCold
939 0 : c.mu.sizeHot--
940 0 : c.mu.sizeCold++
941 0 : }
942 : }
943 :
944 0 : c.mu.handHot = c.mu.handHot.next()
945 : }
946 :
947 0 : func (c *tableCacheShard) runHandTest() {
948 0 : if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
949 0 : c.runHandCold()
950 0 : if c.mu.handTest == nil {
951 0 : return
952 0 : }
953 : }
954 :
955 0 : n := c.mu.handTest
956 0 : if n.ptype == tableCacheNodeTest {
957 0 : c.mu.coldTarget--
958 0 : if c.mu.coldTarget < 0 {
959 0 : c.mu.coldTarget = 0
960 0 : }
961 0 : c.unlinkNode(n)
962 0 : c.clearNode(n)
963 : }
964 :
965 0 : c.mu.handTest = c.mu.handTest.next()
966 : }
967 :
968 1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
969 1 : c.mu.Lock()
970 1 : key := tableCacheKey{dbOpts.cacheID, fileNum}
971 1 : n := c.mu.nodes[key]
972 1 : var v *tableCacheValue
973 1 : if n != nil {
974 1 : // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
975 1 : // the tableCacheNode.release() call synchronously below to ensure the
976 1 : // sstable file descriptor is closed before returning. Note that
977 1 : // tableCacheShard.releasing needs to be incremented while holding
978 1 : // tableCacheShard.mu in order to avoid a race with Close()
979 1 : c.unlinkNode(n)
980 1 : v = n.value
981 1 : if v != nil {
982 1 : if !allowLeak {
983 1 : if t := v.refCount.Add(-1); t != 0 {
984 0 : dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
985 0 : }
986 : }
987 1 : c.releasing.Add(1)
988 : }
989 : }
990 :
991 1 : c.mu.Unlock()
992 1 :
993 1 : if v != nil {
994 1 : v.release(c)
995 1 : }
996 :
997 1 : dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
998 : }
999 :
1000 : // removeDB evicts any nodes which have a reference to the DB
1001 : // associated with dbOpts.cacheID. Make sure that there will
1002 : // be no more accesses to the files associated with the DB.
1003 1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
1004 1 : var fileNums []base.DiskFileNum
1005 1 :
1006 1 : c.mu.RLock()
1007 1 : // Collect the fileNums which need to be cleaned.
1008 1 : var firstNode *tableCacheNode
1009 1 : node := c.mu.handHot
1010 1 : for node != firstNode {
1011 1 : if firstNode == nil {
1012 1 : firstNode = node
1013 1 : }
1014 :
1015 1 : if node.cacheID == dbOpts.cacheID {
1016 1 : fileNums = append(fileNums, node.fileNum)
1017 1 : }
1018 1 : node = node.next()
1019 : }
1020 1 : c.mu.RUnlock()
1021 1 :
1022 1 : // Evict all the nodes associated with the DB.
1023 1 : // This should synchronously close all the files
1024 1 : // associated with the DB.
1025 1 : for _, fileNum := range fileNums {
1026 1 : c.evict(fileNum, dbOpts, true)
1027 1 : }
1028 : }
1029 :
1030 1 : func (c *tableCacheShard) Close() error {
1031 1 : c.mu.Lock()
1032 1 : defer c.mu.Unlock()
1033 1 :
1034 1 : // Check for leaked iterators. Note that we'll still perform cleanup below in
1035 1 : // the case that there are leaked iterators.
1036 1 : var err error
1037 1 : if v := c.iterCount.Load(); v > 0 {
1038 0 : if !invariants.RaceEnabled {
1039 0 : err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
1040 0 : } else {
1041 0 : var buf bytes.Buffer
1042 0 : for _, stack := range c.mu.iters {
1043 0 : fmt.Fprintf(&buf, "%s\n", stack)
1044 0 : }
1045 0 : err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
1046 : }
1047 : }
1048 :
1049 1 : for c.mu.handHot != nil {
1050 0 : n := c.mu.handHot
1051 0 : if n.value != nil {
1052 0 : if n.value.refCount.Add(-1) == 0 {
1053 0 : c.releasing.Add(1)
1054 0 : c.releasingCh <- n.value
1055 0 : }
1056 : }
1057 0 : c.unlinkNode(n)
1058 : }
1059 1 : c.mu.nodes = nil
1060 1 : c.mu.handHot = nil
1061 1 : c.mu.handCold = nil
1062 1 : c.mu.handTest = nil
1063 1 :
1064 1 : // Only shutdown the releasing goroutine if there were no leaked
1065 1 : // iterators. If there were leaked iterators, we leave the goroutine running
1066 1 : // and the releasingCh open so that a subsequent iterator close can
1067 1 : // complete. This behavior is used by iterator leak tests. Leaking the
1068 1 : // goroutine for these tests is less bad not closing the iterator which
1069 1 : // triggers other warnings about block cache handles not being released.
1070 1 : if err != nil {
1071 0 : c.releasing.Wait()
1072 0 : return err
1073 0 : }
1074 :
1075 1 : close(c.releasingCh)
1076 1 : c.releasing.Wait()
1077 1 : c.releaseLoopExit.Wait()
1078 1 : return err
1079 : }
1080 :
1081 : type tableCacheValue struct {
1082 : closeHook func(i sstable.Iterator) error
1083 : reader *sstable.Reader
1084 : err error
1085 : loaded chan struct{}
1086 : // Reference count for the value. The reader is closed when the reference
1087 : // count drops to zero.
1088 : refCount atomic.Int32
1089 : }
1090 :
1091 : type loadInfo struct {
1092 : backingFileNum base.DiskFileNum
1093 : largestSeqNum uint64
1094 : smallestSeqNum uint64
1095 : }
1096 :
1097 1 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
1098 1 : // Try opening the file first.
1099 1 : var f objstorage.Readable
1100 1 : var err error
1101 1 : f, err = dbOpts.objProvider.OpenForReading(
1102 1 : context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
1103 1 : )
1104 1 : if err == nil {
1105 1 : cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
1106 1 : v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
1107 1 : }
1108 1 : if err != nil {
1109 0 : v.err = errors.Wrapf(
1110 0 : err, "pebble: backing file %s error", errors.Safe(loadInfo.backingFileNum.FileNum()))
1111 0 : }
1112 1 : if v.err == nil && loadInfo.smallestSeqNum == loadInfo.largestSeqNum {
1113 1 : v.reader.Properties.GlobalSeqNum = loadInfo.largestSeqNum
1114 1 : }
1115 1 : if v.err != nil {
1116 0 : c.mu.Lock()
1117 0 : defer c.mu.Unlock()
1118 0 : // Lookup the node in the cache again as it might have already been
1119 0 : // removed.
1120 0 : key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
1121 0 : n := c.mu.nodes[key]
1122 0 : if n != nil && n.value == v {
1123 0 : c.releaseNode(n)
1124 0 : }
1125 : }
1126 1 : close(v.loaded)
1127 : }
1128 :
1129 1 : func (v *tableCacheValue) release(c *tableCacheShard) {
1130 1 : <-v.loaded
1131 1 : // Nothing to be done about an error at this point. Close the reader if it is
1132 1 : // open.
1133 1 : if v.reader != nil {
1134 1 : _ = v.reader.Close()
1135 1 : }
1136 1 : c.releasing.Done()
1137 : }
1138 :
1139 : type tableCacheNodeType int8
1140 :
1141 : const (
1142 : tableCacheNodeTest tableCacheNodeType = iota
1143 : tableCacheNodeCold
1144 : tableCacheNodeHot
1145 : )
1146 :
1147 0 : func (p tableCacheNodeType) String() string {
1148 0 : switch p {
1149 0 : case tableCacheNodeTest:
1150 0 : return "test"
1151 0 : case tableCacheNodeCold:
1152 0 : return "cold"
1153 0 : case tableCacheNodeHot:
1154 0 : return "hot"
1155 : }
1156 0 : return "unknown"
1157 : }
1158 :
1159 : type tableCacheNode struct {
1160 : fileNum base.DiskFileNum
1161 : value *tableCacheValue
1162 :
1163 : links struct {
1164 : next *tableCacheNode
1165 : prev *tableCacheNode
1166 : }
1167 : ptype tableCacheNodeType
1168 : // referenced is atomically set to indicate that this entry has been accessed
1169 : // since the last time one of the clock hands swept it.
1170 : referenced atomic.Bool
1171 :
1172 : // Storing the cache id associated with the DB instance here
1173 : // avoids the need to thread the dbOpts struct through many functions.
1174 : cacheID uint64
1175 : }
1176 :
1177 1 : func (n *tableCacheNode) next() *tableCacheNode {
1178 1 : if n == nil {
1179 0 : return nil
1180 0 : }
1181 1 : return n.links.next
1182 : }
1183 :
1184 1 : func (n *tableCacheNode) prev() *tableCacheNode {
1185 1 : if n == nil {
1186 0 : return nil
1187 0 : }
1188 1 : return n.links.prev
1189 : }
1190 :
1191 1 : func (n *tableCacheNode) link(s *tableCacheNode) {
1192 1 : s.links.prev = n.links.prev
1193 1 : s.links.prev.links.next = s
1194 1 : s.links.next = n
1195 1 : s.links.next.links.prev = s
1196 1 : }
1197 :
1198 1 : func (n *tableCacheNode) unlink() *tableCacheNode {
1199 1 : next := n.links.next
1200 1 : n.links.prev.links.next = n.links.next
1201 1 : n.links.next.links.prev = n.links.prev
1202 1 : n.links.prev = n
1203 1 : n.links.next = n
1204 1 : return next
1205 1 : }
|