Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "sync/atomic"
11 : "time"
12 :
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : )
17 :
18 : // flushable defines the interface for immutable memtables.
19 : type flushable interface {
20 : newIter(o *IterOptions) internalIterator
21 : newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator
22 : newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
23 : newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
24 : containsRangeKeys() bool
25 : // inuseBytes returns the number of inuse bytes by the flushable.
26 : inuseBytes() uint64
27 : // totalBytes returns the total number of bytes allocated by the flushable.
28 : totalBytes() uint64
29 : // readyForFlush returns true when the flushable is ready for flushing. See
30 : // memTable.readyForFlush for one implementation which needs to check whether
31 : // there are any outstanding write references.
32 : readyForFlush() bool
33 : }
34 :
35 : // flushableEntry wraps a flushable and adds additional metadata and
36 : // functionality that is common to all flushables.
37 : type flushableEntry struct {
38 : flushable
39 : // Channel which is closed when the flushable has been flushed.
40 : flushed chan struct{}
41 : // flushForced indicates whether a flush was forced on this memtable (either
42 : // manual, or due to ingestion). Protected by DB.mu.
43 : flushForced bool
44 : // delayedFlushForcedAt indicates whether a timer has been set to force a
45 : // flush on this memtable at some point in the future. Protected by DB.mu.
46 : // Holds the timestamp of when the flush will be issued.
47 : delayedFlushForcedAt time.Time
48 : // logNum corresponds to the WAL that contains the records present in the
49 : // receiver.
50 : logNum base.DiskFileNum
51 : // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
52 : logSize uint64
53 : // The current logSeqNum at the time the memtable was created. This is
54 : // guaranteed to be less than or equal to any seqnum stored in the memtable.
55 : logSeqNum uint64
56 : // readerRefs tracks the read references on the flushable. The two sources of
57 : // reader references are DB.mu.mem.queue and readState.memtables. The memory
58 : // reserved by the flushable in the cache is released when the reader refs
59 : // drop to zero. If the flushable is referencing sstables, then the file
60 : // refount is also decreased once the reader refs drops to 0. If the
61 : // flushable is a memTable, when the reader refs drops to zero, the writer
62 : // refs will already be zero because the memtable will have been flushed and
63 : // that only occurs once the writer refs drops to zero.
64 : readerRefs atomic.Int32
65 : // Closure to invoke to release memory accounting.
66 : releaseMemAccounting func()
67 : // unrefFiles, if not nil, should be invoked to decrease the ref count of
68 : // files which are backing the flushable.
69 : unrefFiles func() []*fileBacking
70 : // deleteFnLocked should be called if the caller is holding DB.mu.
71 : deleteFnLocked func(obsolete []*fileBacking)
72 : // deleteFn should be called if the caller is not holding DB.mu.
73 : deleteFn func(obsolete []*fileBacking)
74 : }
75 :
76 1 : func (e *flushableEntry) readerRef() {
77 1 : switch v := e.readerRefs.Add(1); {
78 0 : case v <= 1:
79 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
80 : }
81 : }
82 :
83 : // db.mu must not be held when this is called.
84 1 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
85 1 : e.readerUnrefHelper(deleteFiles, e.deleteFn)
86 1 : }
87 :
88 : // db.mu must be held when this is called.
89 1 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
90 1 : e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
91 1 : }
92 :
93 : func (e *flushableEntry) readerUnrefHelper(
94 : deleteFiles bool, deleteFn func(obsolete []*fileBacking),
95 1 : ) {
96 1 : switch v := e.readerRefs.Add(-1); {
97 0 : case v < 0:
98 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
99 1 : case v == 0:
100 1 : if e.releaseMemAccounting == nil {
101 0 : panic("pebble: memtable reservation already released")
102 : }
103 1 : e.releaseMemAccounting()
104 1 : e.releaseMemAccounting = nil
105 1 : if e.unrefFiles != nil {
106 1 : obsolete := e.unrefFiles()
107 1 : e.unrefFiles = nil
108 1 : if deleteFiles {
109 1 : deleteFn(obsolete)
110 1 : }
111 : }
112 : }
113 : }
114 :
115 : type flushableList []*flushableEntry
116 :
117 : // ingestedFlushable is the implementation of the flushable interface for the
118 : // ingesting sstables which are added to the flushable list.
119 : type ingestedFlushable struct {
120 : files []physicalMeta
121 : comparer *Comparer
122 : newIters tableNewIters
123 : newRangeKeyIters keyspan.TableNewSpanIter
124 :
125 : // Since the level slice is immutable, we construct and set it once. It
126 : // should be safe to read from slice in future reads.
127 : slice manifest.LevelSlice
128 : // hasRangeKeys is set on ingestedFlushable construction.
129 : hasRangeKeys bool
130 : }
131 :
132 : func newIngestedFlushable(
133 : files []*fileMetadata,
134 : comparer *Comparer,
135 : newIters tableNewIters,
136 : newRangeKeyIters keyspan.TableNewSpanIter,
137 1 : ) *ingestedFlushable {
138 1 : var physicalFiles []physicalMeta
139 1 : var hasRangeKeys bool
140 1 : for _, f := range files {
141 1 : if f.HasRangeKeys {
142 1 : hasRangeKeys = true
143 1 : }
144 1 : physicalFiles = append(physicalFiles, f.PhysicalMeta())
145 : }
146 :
147 1 : ret := &ingestedFlushable{
148 1 : files: physicalFiles,
149 1 : comparer: comparer,
150 1 : newIters: newIters,
151 1 : newRangeKeyIters: newRangeKeyIters,
152 1 : // slice is immutable and can be set once and used many times.
153 1 : slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files),
154 1 : hasRangeKeys: hasRangeKeys,
155 1 : }
156 1 :
157 1 : return ret
158 : }
159 :
160 : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
161 : // tracing.
162 :
163 : // newIter is part of the flushable interface.
164 1 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
165 1 : var opts IterOptions
166 1 : if o != nil {
167 1 : opts = *o
168 1 : }
169 : // TODO(bananabrick): The manifest.Level in newLevelIter is only used for
170 : // logging. Update the manifest.Level encoding to account for levels which
171 : // aren't truly levels in the lsm. Right now, the encoding only supports
172 : // L0 sublevels, and the rest of the levels in the lsm.
173 1 : return newLevelIter(
174 1 : context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.Level(0),
175 1 : internalIterOpts{},
176 1 : )
177 : }
178 :
179 : // newFlushIter is part of the flushable interface.
180 0 : func (s *ingestedFlushable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
181 0 : // newFlushIter is only used for writing memtables to disk as sstables.
182 0 : // Since ingested sstables are already present on disk, they don't need to
183 0 : // make use of a flush iter.
184 0 : panic("pebble: not implemented")
185 : }
186 :
187 : func (s *ingestedFlushable) constructRangeDelIter(
188 : file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
189 1 : ) (keyspan.FragmentIterator, error) {
190 1 : // Note that the keyspan level iter expects a non-nil iterator to be
191 1 : // returned even if there is an error. So, we return the emptyKeyspanIter.
192 1 : iter, rangeDelIter, err := s.newIters(context.Background(), file, nil, internalIterOpts{})
193 1 : if err != nil {
194 0 : return emptyKeyspanIter, err
195 0 : }
196 1 : iter.Close()
197 1 : if rangeDelIter == nil {
198 1 : return emptyKeyspanIter, nil
199 1 : }
200 1 : return rangeDelIter, nil
201 : }
202 :
203 : // newRangeDelIter is part of the flushable interface.
204 : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
205 : // surface range deletes is more efficient.
206 : //
207 : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
208 : // the point iterator in constructRangeDeIter is not tracked.
209 1 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
210 1 : return keyspan.NewLevelIter(
211 1 : keyspan.SpanIterOptions{}, s.comparer.Compare,
212 1 : s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
213 1 : manifest.KeyTypePoint,
214 1 : )
215 1 : }
216 :
217 : // newRangeKeyIter is part of the flushable interface.
218 1 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
219 1 : if !s.containsRangeKeys() {
220 1 : return nil
221 1 : }
222 :
223 1 : return keyspan.NewLevelIter(
224 1 : keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
225 1 : s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
226 1 : )
227 : }
228 :
229 : // containsRangeKeys is part of the flushable interface.
230 1 : func (s *ingestedFlushable) containsRangeKeys() bool {
231 1 : return s.hasRangeKeys
232 1 : }
233 :
234 : // inuseBytes is part of the flushable interface.
235 0 : func (s *ingestedFlushable) inuseBytes() uint64 {
236 0 : // inuseBytes is only used when memtables are flushed to disk as sstables.
237 0 : panic("pebble: not implemented")
238 : }
239 :
240 : // totalBytes is part of the flushable interface.
241 1 : func (s *ingestedFlushable) totalBytes() uint64 {
242 1 : // We don't allocate additional bytes for the ingestedFlushable.
243 1 : return 0
244 1 : }
245 :
246 : // readyForFlush is part of the flushable interface.
247 1 : func (s *ingestedFlushable) readyForFlush() bool {
248 1 : // ingestedFlushable should always be ready to flush. However, note that
249 1 : // memtables before the ingested sstables in the memtable queue must be
250 1 : // flushed before an ingestedFlushable can be flushed. This is because the
251 1 : // ingested sstables need an updated view of the Version to
252 1 : // determine where to place the files in the lsm.
253 1 : return true
254 1 : }
|