Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "sync/atomic"
9 :
10 : "github.com/cockroachdb/pebble/internal/manifest"
11 : )
12 :
13 : // readState encapsulates the state needed for reading (the current version and
14 : // list of memtables). Loading the readState is done without grabbing
15 : // DB.mu. Instead, a separate DB.readState.RWMutex is used for
16 : // synchronization. This mutex solely covers the current readState object which
17 : // means it is rarely or ever contended.
18 : //
19 : // Note that various fancy lock-free mechanisms can be imagined for loading the
20 : // readState, but benchmarking showed the ones considered to purely be
21 : // pessimizations. The RWMutex version is a single atomic increment for the
22 : // RLock and an atomic decrement for the RUnlock. It is difficult to do better
23 : // than that without something like thread-local storage which isn't available
24 : // in Go.
25 : type readState struct {
26 : db *DB
27 : refcnt atomic.Int32
28 : current *manifest.Version
29 : memtables flushableList
30 : }
31 :
32 : // ref adds a reference to the readState.
33 1 : func (s *readState) ref() {
34 1 : s.refcnt.Add(1)
35 1 : }
36 :
37 : // unref removes a reference to the readState. If this was the last reference,
38 : // the reference the readState holds on the version is released. Requires DB.mu
39 : // is NOT held as version.unref() will acquire it. See unrefLocked() if DB.mu
40 : // is held by the caller.
41 1 : func (s *readState) unref() {
42 1 : if s.refcnt.Add(-1) != 0 {
43 1 : return
44 1 : }
45 1 : s.current.Unref()
46 1 : for _, mem := range s.memtables {
47 1 : mem.readerUnref(true)
48 1 : }
49 :
50 : // The last reference to the readState was released. Check to see if there
51 : // are new obsolete objects to delete.
52 1 : s.db.maybeScheduleObsoleteObjectDeletion()
53 : }
54 :
55 : // unrefLocked removes a reference to the readState. If this was the last
56 : // reference, the reference the readState holds on the version is
57 : // released.
58 : //
59 : // DB.mu must be held. See unref() if DB.mu is NOT held by the caller.
60 1 : func (s *readState) unrefLocked() {
61 1 : if s.refcnt.Add(-1) != 0 {
62 1 : return
63 1 : }
64 1 : s.current.UnrefLocked()
65 1 : for _, mem := range s.memtables {
66 1 : mem.readerUnrefLocked(true)
67 1 : }
68 :
69 : // In this code path, the caller is responsible for scheduling obsolete
70 : // object deletions as necessary.
71 : }
72 :
73 : // loadReadState returns the current readState. The returned readState must be
74 : // unreferenced when the caller is finished with it.
75 1 : func (d *DB) loadReadState() *readState {
76 1 : d.readState.RLock()
77 1 : state := d.readState.val
78 1 : state.ref()
79 1 : d.readState.RUnlock()
80 1 : return state
81 1 : }
82 :
83 : // updateReadStateLocked creates a new readState from the current version and
84 : // list of memtables. Requires DB.mu is held. If checker is not nil, it is
85 : // called after installing the new readState.
86 1 : func (d *DB) updateReadStateLocked(checker func(*DB) error) {
87 1 : s := &readState{
88 1 : db: d,
89 1 : current: d.mu.versions.currentVersion(),
90 1 : memtables: d.mu.mem.queue,
91 1 : }
92 1 : s.refcnt.Store(1)
93 1 : s.current.Ref()
94 1 : for _, mem := range s.memtables {
95 1 : mem.readerRef()
96 1 : }
97 :
98 1 : d.readState.Lock()
99 1 : old := d.readState.val
100 1 : d.readState.val = s
101 1 : d.readState.Unlock()
102 1 : if checker != nil {
103 1 : if err := checker(d); err != nil {
104 0 : d.opts.Logger.Fatalf("checker failed with error: %s", err)
105 0 : }
106 : }
107 1 : if old != nil {
108 1 : old.unrefLocked()
109 1 : }
110 : }
|