Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "sync/atomic"
12 : "time"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : )
21 :
22 : // flushable defines the interface for immutable memtables.
23 : type flushable interface {
24 : newIter(o *IterOptions) internalIterator
25 : newFlushIter(o *IterOptions) internalIterator
26 : newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
27 : newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
28 : containsRangeKeys() bool
29 : // inuseBytes returns the number of inuse bytes by the flushable.
30 : inuseBytes() uint64
31 : // totalBytes returns the total number of bytes allocated by the flushable.
32 : totalBytes() uint64
33 : // readyForFlush returns true when the flushable is ready for flushing. See
34 : // memTable.readyForFlush for one implementation which needs to check whether
35 : // there are any outstanding write references.
36 : readyForFlush() bool
37 : // computePossibleOverlaps determines whether the flushable's keys overlap
38 : // with the bounds of any of the provided bounded items. If an item overlaps
39 : // or might overlap but it's not possible to determine overlap cheaply,
40 : // computePossibleOverlaps invokes the provided function with the object
41 : // that might overlap. computePossibleOverlaps must not perform any I/O and
42 : // implementations should invoke the provided function for items that would
43 : // require I/O to determine overlap.
44 : computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
45 : }
46 :
47 : type shouldContinue bool
48 :
49 : const (
50 : continueIteration shouldContinue = true
51 : stopIteration = false
52 : )
53 :
54 : type bounded interface {
55 : UserKeyBounds() base.UserKeyBounds
56 : }
57 :
58 : var _ bounded = (*fileMetadata)(nil)
59 : var _ bounded = KeyRange{}
60 :
61 1 : func sliceAsBounded[B bounded](s []B) []bounded {
62 1 : ret := make([]bounded, len(s))
63 1 : for i := 0; i < len(s); i++ {
64 1 : ret[i] = s[i]
65 1 : }
66 1 : return ret
67 : }
68 :
69 : // flushableEntry wraps a flushable and adds additional metadata and
70 : // functionality that is common to all flushables.
71 : type flushableEntry struct {
72 : flushable
73 : // Channel which is closed when the flushable has been flushed.
74 : flushed chan struct{}
75 : // flushForced indicates whether a flush was forced on this memtable (either
76 : // manual, or due to ingestion). Protected by DB.mu.
77 : flushForced bool
78 : // delayedFlushForcedAt indicates whether a timer has been set to force a
79 : // flush on this memtable at some point in the future. Protected by DB.mu.
80 : // Holds the timestamp of when the flush will be issued.
81 : delayedFlushForcedAt time.Time
82 : // logNum corresponds to the WAL that contains the records present in the
83 : // receiver.
84 : logNum base.DiskFileNum
85 : // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
86 : logSize uint64
87 : // The current logSeqNum at the time the memtable was created. This is
88 : // guaranteed to be less than or equal to any seqnum stored in the memtable.
89 : logSeqNum uint64
90 : // readerRefs tracks the read references on the flushable. The two sources of
91 : // reader references are DB.mu.mem.queue and readState.memtables. The memory
92 : // reserved by the flushable in the cache is released when the reader refs
93 : // drop to zero. If the flushable is referencing sstables, then the file
94 : // refount is also decreased once the reader refs drops to 0. If the
95 : // flushable is a memTable, when the reader refs drops to zero, the writer
96 : // refs will already be zero because the memtable will have been flushed and
97 : // that only occurs once the writer refs drops to zero.
98 : readerRefs atomic.Int32
99 : // Closure to invoke to release memory accounting.
100 : releaseMemAccounting func()
101 : // unrefFiles, if not nil, should be invoked to decrease the ref count of
102 : // files which are backing the flushable.
103 : unrefFiles func() []*fileBacking
104 : // deleteFnLocked should be called if the caller is holding DB.mu.
105 : deleteFnLocked func(obsolete []*fileBacking)
106 : // deleteFn should be called if the caller is not holding DB.mu.
107 : deleteFn func(obsolete []*fileBacking)
108 : }
109 :
110 1 : func (e *flushableEntry) readerRef() {
111 1 : switch v := e.readerRefs.Add(1); {
112 0 : case v <= 1:
113 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
114 : }
115 : }
116 :
117 : // db.mu must not be held when this is called.
118 1 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
119 1 : e.readerUnrefHelper(deleteFiles, e.deleteFn)
120 1 : }
121 :
122 : // db.mu must be held when this is called.
123 1 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
124 1 : e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
125 1 : }
126 :
127 : func (e *flushableEntry) readerUnrefHelper(
128 : deleteFiles bool, deleteFn func(obsolete []*fileBacking),
129 1 : ) {
130 1 : switch v := e.readerRefs.Add(-1); {
131 0 : case v < 0:
132 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
133 1 : case v == 0:
134 1 : if e.releaseMemAccounting == nil {
135 0 : panic("pebble: memtable reservation already released")
136 : }
137 1 : e.releaseMemAccounting()
138 1 : e.releaseMemAccounting = nil
139 1 : if e.unrefFiles != nil {
140 1 : obsolete := e.unrefFiles()
141 1 : e.unrefFiles = nil
142 1 : if deleteFiles {
143 1 : deleteFn(obsolete)
144 1 : }
145 : }
146 : }
147 : }
148 :
149 : type flushableList []*flushableEntry
150 :
151 : // ingestedFlushable is the implementation of the flushable interface for the
152 : // ingesting sstables which are added to the flushable list.
153 : type ingestedFlushable struct {
154 : // files are non-overlapping and ordered (according to their bounds).
155 : files []physicalMeta
156 : comparer *Comparer
157 : newIters tableNewIters
158 : newRangeKeyIters keyspanimpl.TableNewSpanIter
159 :
160 : // Since the level slice is immutable, we construct and set it once. It
161 : // should be safe to read from slice in future reads.
162 : slice manifest.LevelSlice
163 : // hasRangeKeys is set on ingestedFlushable construction.
164 : hasRangeKeys bool
165 : // exciseSpan is populated if an excise operation should be performed during
166 : // flush.
167 : exciseSpan KeyRange
168 : }
169 :
170 : func newIngestedFlushable(
171 : files []*fileMetadata,
172 : comparer *Comparer,
173 : newIters tableNewIters,
174 : newRangeKeyIters keyspanimpl.TableNewSpanIter,
175 : exciseSpan KeyRange,
176 1 : ) *ingestedFlushable {
177 1 : if invariants.Enabled {
178 1 : for i := 1; i < len(files); i++ {
179 1 : prev := files[i-1].UserKeyBounds()
180 1 : this := files[i].UserKeyBounds()
181 1 : if prev.End.IsUpperBoundFor(comparer.Compare, this.Start) {
182 0 : panic(errors.AssertionFailedf("ingested flushable files overlap: %s %s", prev, this))
183 : }
184 : }
185 : }
186 1 : var physicalFiles []physicalMeta
187 1 : var hasRangeKeys bool
188 1 : for _, f := range files {
189 1 : if f.HasRangeKeys {
190 1 : hasRangeKeys = true
191 1 : }
192 1 : physicalFiles = append(physicalFiles, f.PhysicalMeta())
193 : }
194 :
195 1 : ret := &ingestedFlushable{
196 1 : files: physicalFiles,
197 1 : comparer: comparer,
198 1 : newIters: newIters,
199 1 : newRangeKeyIters: newRangeKeyIters,
200 1 : // slice is immutable and can be set once and used many times.
201 1 : slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files),
202 1 : hasRangeKeys: hasRangeKeys,
203 1 : exciseSpan: exciseSpan,
204 1 : }
205 1 :
206 1 : return ret
207 : }
208 :
209 : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
210 : // tracing.
211 :
212 : // newIter is part of the flushable interface.
213 1 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
214 1 : var opts IterOptions
215 1 : if o != nil {
216 1 : opts = *o
217 1 : }
218 : // TODO(bananabrick): The manifest.Level in newLevelIter is only used for
219 : // logging. Update the manifest.Level encoding to account for levels which
220 : // aren't truly levels in the lsm. Right now, the encoding only supports
221 : // L0 sublevels, and the rest of the levels in the lsm.
222 1 : return newLevelIter(
223 1 : context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.Level(0),
224 1 : internalIterOpts{},
225 1 : )
226 : }
227 :
228 : // newFlushIter is part of the flushable interface.
229 0 : func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
230 0 : // newFlushIter is only used for writing memtables to disk as sstables.
231 0 : // Since ingested sstables are already present on disk, they don't need to
232 0 : // make use of a flush iter.
233 0 : panic("pebble: not implemented")
234 : }
235 :
236 : func (s *ingestedFlushable) constructRangeDelIter(
237 : file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
238 1 : ) (keyspan.FragmentIterator, error) {
239 1 : // Note that the keyspan level iter expects a non-nil iterator to be
240 1 : // returned even if there is an error. So, we return the emptyKeyspanIter.
241 1 : iter, rangeDelIter, err := s.newIters.TODO(context.Background(), file, nil, internalIterOpts{})
242 1 : if err != nil {
243 0 : return emptyKeyspanIter, err
244 0 : }
245 1 : iter.Close()
246 1 : if rangeDelIter == nil {
247 1 : return emptyKeyspanIter, nil
248 1 : }
249 1 : return rangeDelIter, nil
250 : }
251 :
252 : // newRangeDelIter is part of the flushable interface.
253 : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
254 : // surface range deletes is more efficient.
255 : //
256 : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
257 : // the point iterator in constructRangeDeIter is not tracked.
258 1 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
259 1 : return keyspanimpl.NewLevelIter(
260 1 : keyspan.SpanIterOptions{}, s.comparer.Compare,
261 1 : s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
262 1 : manifest.KeyTypePoint,
263 1 : )
264 1 : }
265 :
266 : // newRangeKeyIter is part of the flushable interface.
267 1 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
268 1 : if !s.containsRangeKeys() {
269 1 : return nil
270 1 : }
271 :
272 1 : return keyspanimpl.NewLevelIter(
273 1 : keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
274 1 : s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
275 1 : )
276 : }
277 :
278 : // containsRangeKeys is part of the flushable interface.
279 1 : func (s *ingestedFlushable) containsRangeKeys() bool {
280 1 : return s.hasRangeKeys
281 1 : }
282 :
283 : // inuseBytes is part of the flushable interface.
284 0 : func (s *ingestedFlushable) inuseBytes() uint64 {
285 0 : // inuseBytes is only used when memtables are flushed to disk as sstables.
286 0 : panic("pebble: not implemented")
287 : }
288 :
289 : // totalBytes is part of the flushable interface.
290 1 : func (s *ingestedFlushable) totalBytes() uint64 {
291 1 : // We don't allocate additional bytes for the ingestedFlushable.
292 1 : return 0
293 1 : }
294 :
295 : // readyForFlush is part of the flushable interface.
296 1 : func (s *ingestedFlushable) readyForFlush() bool {
297 1 : // ingestedFlushable should always be ready to flush. However, note that
298 1 : // memtables before the ingested sstables in the memtable queue must be
299 1 : // flushed before an ingestedFlushable can be flushed. This is because the
300 1 : // ingested sstables need an updated view of the Version to
301 1 : // determine where to place the files in the lsm.
302 1 : return true
303 1 : }
304 :
305 : // computePossibleOverlaps is part of the flushable interface.
306 : func (s *ingestedFlushable) computePossibleOverlaps(
307 : fn func(bounded) shouldContinue, bounded ...bounded,
308 1 : ) {
309 1 : for _, b := range bounded {
310 1 : bounds := b.UserKeyBounds()
311 1 : if s.anyFileOverlaps(b, &bounds) {
312 1 : // Some file overlaps in key boundaries. The file doesn't necessarily
313 1 : // contain any keys within the key range, but we would need to perform I/O
314 1 : // to know for sure. The flushable interface dictates that we're not
315 1 : // permitted to perform I/O here, so err towards assuming overlap.
316 1 : if !fn(b) {
317 1 : return
318 1 : }
319 : }
320 : }
321 : }
322 :
323 : // anyFileBoundsOverlap returns true if there is at least a file in s.files with
324 : // bounds that overlap the given bounds.
325 1 : func (s *ingestedFlushable) anyFileOverlaps(b bounded, bounds *base.UserKeyBounds) bool {
326 1 : // Note that s.files are non-overlapping and sorted.
327 1 : for _, f := range s.files {
328 1 : fileBounds := f.UserKeyBounds()
329 1 : if !fileBounds.End.IsUpperBoundFor(s.comparer.Compare, bounds.Start) {
330 1 : // The file ends before the bounds start. Go to the next file.
331 1 : continue
332 : }
333 1 : if !bounds.End.IsUpperBoundFor(s.comparer.Compare, fileBounds.Start) {
334 1 : // The file starts after the bounds end. There is no overlap, and
335 1 : // further files will not overlap either (the files are sorted).
336 1 : return false
337 1 : }
338 : // There is overlap. Note that UserKeyBounds.Overlaps() performs exactly the
339 : // checks above.
340 1 : return true
341 : }
342 1 : return false
343 : }
344 :
345 : // computePossibleOverlapsGenericImpl is an implementation of the flushable
346 : // interface's computePossibleOverlaps function for flushable implementations
347 : // with only in-memory state that do not have special requirements and should
348 : // read through the ordinary flushable iterators.
349 : //
350 : // This function must only be used with implementations that are infallible (eg,
351 : // memtable iterators) and will panic if an error is encountered.
352 : func computePossibleOverlapsGenericImpl[F flushable](
353 : f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
354 1 : ) {
355 1 : iter := f.newIter(nil)
356 1 : rangeDelIter := f.newRangeDelIter(nil)
357 1 : rkeyIter := f.newRangeKeyIter(nil)
358 1 : for _, b := range bounded {
359 1 : if overlapWithIterator(iter, &rangeDelIter, rkeyIter, b.UserKeyBounds(), cmp) {
360 1 : if !fn(b) {
361 1 : break
362 : }
363 : }
364 : }
365 :
366 1 : for _, c := range [3]io.Closer{iter, rangeDelIter, rkeyIter} {
367 1 : if c != nil {
368 1 : if err := c.Close(); err != nil {
369 0 : // This implementation must be used in circumstances where
370 0 : // reading through the iterator is infallible.
371 0 : panic(err)
372 : }
373 : }
374 : }
375 : }
|