Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "sync/atomic"
12 : "time"
13 :
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : )
18 :
19 : // flushable defines the interface for immutable memtables.
20 : type flushable interface {
21 : newIter(o *IterOptions) internalIterator
22 : newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator
23 : newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
24 : newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
25 : containsRangeKeys() bool
26 : // inuseBytes returns the number of inuse bytes by the flushable.
27 : inuseBytes() uint64
28 : // totalBytes returns the total number of bytes allocated by the flushable.
29 : totalBytes() uint64
30 : // readyForFlush returns true when the flushable is ready for flushing. See
31 : // memTable.readyForFlush for one implementation which needs to check whether
32 : // there are any outstanding write references.
33 : readyForFlush() bool
34 : // computePossibleOverlaps determines whether the flushable's keys overlap
35 : // with the bounds of any of the provided bounded items. If an item overlaps
36 : // or might overlap but it's not possible to determine overlap cheaply,
37 : // computePossibleOverlaps invokes the provided function with the object
38 : // that might overlap. computePossibleOverlaps must not perform any I/O and
39 : // implementations should invoke the provided function for items that would
40 : // require I/O to determine overlap.
41 : computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
42 : }
43 :
44 : type shouldContinue bool
45 :
46 : const (
47 : continueIteration shouldContinue = true
48 : stopIteration = false
49 : )
50 :
51 : type bounded interface {
52 : // InternalKeyBounds returns a start key and an end key. Both bounds are
53 : // inclusive.
54 : InternalKeyBounds() (InternalKey, InternalKey)
55 : }
56 :
57 2 : func sliceAsBounded[B bounded](s []B) []bounded {
58 2 : ret := make([]bounded, len(s))
59 2 : for i := 0; i < len(s); i++ {
60 2 : ret[i] = s[i]
61 2 : }
62 2 : return ret
63 : }
64 :
65 : // flushableEntry wraps a flushable and adds additional metadata and
66 : // functionality that is common to all flushables.
67 : type flushableEntry struct {
68 : flushable
69 : // Channel which is closed when the flushable has been flushed.
70 : flushed chan struct{}
71 : // flushForced indicates whether a flush was forced on this memtable (either
72 : // manual, or due to ingestion). Protected by DB.mu.
73 : flushForced bool
74 : // delayedFlushForcedAt indicates whether a timer has been set to force a
75 : // flush on this memtable at some point in the future. Protected by DB.mu.
76 : // Holds the timestamp of when the flush will be issued.
77 : delayedFlushForcedAt time.Time
78 : // logNum corresponds to the WAL that contains the records present in the
79 : // receiver.
80 : logNum base.DiskFileNum
81 : // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
82 : logSize uint64
83 : // The current logSeqNum at the time the memtable was created. This is
84 : // guaranteed to be less than or equal to any seqnum stored in the memtable.
85 : logSeqNum uint64
86 : // readerRefs tracks the read references on the flushable. The two sources of
87 : // reader references are DB.mu.mem.queue and readState.memtables. The memory
88 : // reserved by the flushable in the cache is released when the reader refs
89 : // drop to zero. If the flushable is referencing sstables, then the file
90 : // refount is also decreased once the reader refs drops to 0. If the
91 : // flushable is a memTable, when the reader refs drops to zero, the writer
92 : // refs will already be zero because the memtable will have been flushed and
93 : // that only occurs once the writer refs drops to zero.
94 : readerRefs atomic.Int32
95 : // Closure to invoke to release memory accounting.
96 : releaseMemAccounting func()
97 : // unrefFiles, if not nil, should be invoked to decrease the ref count of
98 : // files which are backing the flushable.
99 : unrefFiles func() []*fileBacking
100 : // deleteFnLocked should be called if the caller is holding DB.mu.
101 : deleteFnLocked func(obsolete []*fileBacking)
102 : // deleteFn should be called if the caller is not holding DB.mu.
103 : deleteFn func(obsolete []*fileBacking)
104 : }
105 :
106 2 : func (e *flushableEntry) readerRef() {
107 2 : switch v := e.readerRefs.Add(1); {
108 0 : case v <= 1:
109 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
110 : }
111 : }
112 :
113 : // db.mu must not be held when this is called.
114 2 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
115 2 : e.readerUnrefHelper(deleteFiles, e.deleteFn)
116 2 : }
117 :
118 : // db.mu must be held when this is called.
119 2 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
120 2 : e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
121 2 : }
122 :
123 : func (e *flushableEntry) readerUnrefHelper(
124 : deleteFiles bool, deleteFn func(obsolete []*fileBacking),
125 2 : ) {
126 2 : switch v := e.readerRefs.Add(-1); {
127 0 : case v < 0:
128 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
129 2 : case v == 0:
130 2 : if e.releaseMemAccounting == nil {
131 0 : panic("pebble: memtable reservation already released")
132 : }
133 2 : e.releaseMemAccounting()
134 2 : e.releaseMemAccounting = nil
135 2 : if e.unrefFiles != nil {
136 2 : obsolete := e.unrefFiles()
137 2 : e.unrefFiles = nil
138 2 : if deleteFiles {
139 2 : deleteFn(obsolete)
140 2 : }
141 : }
142 : }
143 : }
144 :
145 : type flushableList []*flushableEntry
146 :
147 : // ingestedFlushable is the implementation of the flushable interface for the
148 : // ingesting sstables which are added to the flushable list.
149 : type ingestedFlushable struct {
150 : files []physicalMeta
151 : comparer *Comparer
152 : newIters tableNewIters
153 : newRangeKeyIters keyspan.TableNewSpanIter
154 :
155 : // Since the level slice is immutable, we construct and set it once. It
156 : // should be safe to read from slice in future reads.
157 : slice manifest.LevelSlice
158 : // hasRangeKeys is set on ingestedFlushable construction.
159 : hasRangeKeys bool
160 : }
161 :
162 : func newIngestedFlushable(
163 : files []*fileMetadata,
164 : comparer *Comparer,
165 : newIters tableNewIters,
166 : newRangeKeyIters keyspan.TableNewSpanIter,
167 2 : ) *ingestedFlushable {
168 2 : var physicalFiles []physicalMeta
169 2 : var hasRangeKeys bool
170 2 : for _, f := range files {
171 2 : if f.HasRangeKeys {
172 2 : hasRangeKeys = true
173 2 : }
174 2 : physicalFiles = append(physicalFiles, f.PhysicalMeta())
175 : }
176 :
177 2 : ret := &ingestedFlushable{
178 2 : files: physicalFiles,
179 2 : comparer: comparer,
180 2 : newIters: newIters,
181 2 : newRangeKeyIters: newRangeKeyIters,
182 2 : // slice is immutable and can be set once and used many times.
183 2 : slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files),
184 2 : hasRangeKeys: hasRangeKeys,
185 2 : }
186 2 :
187 2 : return ret
188 : }
189 :
190 : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
191 : // tracing.
192 :
193 : // newIter is part of the flushable interface.
194 2 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
195 2 : var opts IterOptions
196 2 : if o != nil {
197 2 : opts = *o
198 2 : }
199 : // TODO(bananabrick): The manifest.Level in newLevelIter is only used for
200 : // logging. Update the manifest.Level encoding to account for levels which
201 : // aren't truly levels in the lsm. Right now, the encoding only supports
202 : // L0 sublevels, and the rest of the levels in the lsm.
203 2 : return newLevelIter(
204 2 : context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.Level(0),
205 2 : internalIterOpts{},
206 2 : )
207 : }
208 :
209 : // newFlushIter is part of the flushable interface.
210 0 : func (s *ingestedFlushable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
211 0 : // newFlushIter is only used for writing memtables to disk as sstables.
212 0 : // Since ingested sstables are already present on disk, they don't need to
213 0 : // make use of a flush iter.
214 0 : panic("pebble: not implemented")
215 : }
216 :
217 : func (s *ingestedFlushable) constructRangeDelIter(
218 : file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
219 2 : ) (keyspan.FragmentIterator, error) {
220 2 : // Note that the keyspan level iter expects a non-nil iterator to be
221 2 : // returned even if there is an error. So, we return the emptyKeyspanIter.
222 2 : iter, rangeDelIter, err := s.newIters(context.Background(), file, nil, internalIterOpts{})
223 2 : if err != nil {
224 0 : return emptyKeyspanIter, err
225 0 : }
226 2 : iter.Close()
227 2 : if rangeDelIter == nil {
228 2 : return emptyKeyspanIter, nil
229 2 : }
230 2 : return rangeDelIter, nil
231 : }
232 :
233 : // newRangeDelIter is part of the flushable interface.
234 : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
235 : // surface range deletes is more efficient.
236 : //
237 : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
238 : // the point iterator in constructRangeDeIter is not tracked.
239 2 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
240 2 : return keyspan.NewLevelIter(
241 2 : keyspan.SpanIterOptions{}, s.comparer.Compare,
242 2 : s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
243 2 : manifest.KeyTypePoint,
244 2 : )
245 2 : }
246 :
247 : // newRangeKeyIter is part of the flushable interface.
248 2 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
249 2 : if !s.containsRangeKeys() {
250 2 : return nil
251 2 : }
252 :
253 2 : return keyspan.NewLevelIter(
254 2 : keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
255 2 : s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
256 2 : )
257 : }
258 :
259 : // containsRangeKeys is part of the flushable interface.
260 2 : func (s *ingestedFlushable) containsRangeKeys() bool {
261 2 : return s.hasRangeKeys
262 2 : }
263 :
264 : // inuseBytes is part of the flushable interface.
265 0 : func (s *ingestedFlushable) inuseBytes() uint64 {
266 0 : // inuseBytes is only used when memtables are flushed to disk as sstables.
267 0 : panic("pebble: not implemented")
268 : }
269 :
270 : // totalBytes is part of the flushable interface.
271 2 : func (s *ingestedFlushable) totalBytes() uint64 {
272 2 : // We don't allocate additional bytes for the ingestedFlushable.
273 2 : return 0
274 2 : }
275 :
276 : // readyForFlush is part of the flushable interface.
277 2 : func (s *ingestedFlushable) readyForFlush() bool {
278 2 : // ingestedFlushable should always be ready to flush. However, note that
279 2 : // memtables before the ingested sstables in the memtable queue must be
280 2 : // flushed before an ingestedFlushable can be flushed. This is because the
281 2 : // ingested sstables need an updated view of the Version to
282 2 : // determine where to place the files in the lsm.
283 2 : return true
284 2 : }
285 :
286 : // computePossibleOverlaps is part of the flushable interface.
287 : func (s *ingestedFlushable) computePossibleOverlaps(
288 : fn func(bounded) shouldContinue, bounded ...bounded,
289 2 : ) {
290 2 : for i := range bounded {
291 2 : smallest, largest := bounded[i].InternalKeyBounds()
292 2 : for j := 0; j < len(s.files); j++ {
293 2 : if sstableKeyCompare(s.comparer.Compare, s.files[j].Largest, smallest) >= 0 {
294 2 : // This file's largest key is larger than smallest. Either the
295 2 : // file overlaps the bounds, or it lies strictly after the
296 2 : // bounds. Either way we can stop iterating since the files are
297 2 : // sorted. But first, determine if there's overlap and call fn
298 2 : // if necessary.
299 2 : if sstableKeyCompare(s.comparer.Compare, s.files[j].Smallest, largest) <= 0 {
300 2 : // The file overlaps in key boundaries. The file doesn't necessarily
301 2 : // contain any keys within the key range, but we would need to
302 2 : // perform I/O to know for sure. The flushable interface dictates
303 2 : // that we're not permitted to perform I/O here, so err towards
304 2 : // assuming overlap.
305 2 : if !fn(bounded[i]) {
306 1 : return
307 1 : }
308 : }
309 2 : break
310 : }
311 : }
312 : }
313 : }
314 :
315 : // computePossibleOverlapsGenericImpl is an implemention of the flushable
316 : // interface's computePossibleOverlaps function for flushable implementations
317 : // with only in-memory state that do not have special requirements and should
318 : // read through the ordinary flushable iterators.
319 : //
320 : // This function must only be used with implementations that are infallible (eg,
321 : // memtable iterators) and will panic if an error is encountered.
322 : func computePossibleOverlapsGenericImpl[F flushable](
323 : f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
324 2 : ) {
325 2 : iter := f.newIter(nil)
326 2 : rangeDelIter := f.newRangeDelIter(nil)
327 2 : rkeyIter := f.newRangeKeyIter(nil)
328 2 : for _, b := range bounded {
329 2 : s, l := b.InternalKeyBounds()
330 2 : kr := internalKeyRange{s, l}
331 2 : if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, cmp) {
332 2 : if !fn(b) {
333 2 : break
334 : }
335 : }
336 : }
337 :
338 2 : for _, c := range [3]io.Closer{iter, rangeDelIter, rkeyIter} {
339 2 : if c != nil {
340 2 : if err := c.Close(); err != nil {
341 0 : // This implementation must be used in circumstances where
342 0 : // reading through the iterator is infallible.
343 0 : panic(err)
344 : }
345 : }
346 : }
347 : }
|