Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "sync/atomic"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/keyspan"
17 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
18 : "github.com/cockroachdb/pebble/internal/manifest"
19 : )
20 :
21 : // flushable defines the interface for immutable memtables.
22 : type flushable interface {
23 : newIter(o *IterOptions) internalIterator
24 : newFlushIter(o *IterOptions) internalIterator
25 : newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
26 : newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
27 : containsRangeKeys() bool
28 : // inuseBytes returns the number of inuse bytes by the flushable.
29 : inuseBytes() uint64
30 : // totalBytes returns the total number of bytes allocated by the flushable.
31 : totalBytes() uint64
32 : // readyForFlush returns true when the flushable is ready for flushing. See
33 : // memTable.readyForFlush for one implementation which needs to check whether
34 : // there are any outstanding write references.
35 : readyForFlush() bool
36 : // computePossibleOverlaps determines whether the flushable's keys overlap
37 : // with the bounds of any of the provided bounded items. If an item overlaps
38 : // or might overlap but it's not possible to determine overlap cheaply,
39 : // computePossibleOverlaps invokes the provided function with the object
40 : // that might overlap. computePossibleOverlaps must not perform any I/O and
41 : // implementations should invoke the provided function for items that would
42 : // require I/O to determine overlap.
43 : computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
44 : }
45 :
46 : type shouldContinue bool
47 :
48 : const (
49 : continueIteration shouldContinue = true
50 : stopIteration = false
51 : )
52 :
53 : type bounded interface {
54 : UserKeyBounds() base.UserKeyBounds
55 : }
56 :
57 : var _ bounded = (*tableMetadata)(nil)
58 : var _ bounded = KeyRange{}
59 :
60 1 : func sliceAsBounded[B bounded](s []B) []bounded {
61 1 : ret := make([]bounded, len(s))
62 1 : for i := 0; i < len(s); i++ {
63 1 : ret[i] = s[i]
64 1 : }
65 1 : return ret
66 : }
67 :
68 : // flushableEntry wraps a flushable and adds additional metadata and
69 : // functionality that is common to all flushables.
70 : type flushableEntry struct {
71 : flushable
72 : // Channel which is closed when the flushable has been flushed.
73 : flushed chan struct{}
74 : // flushForced indicates whether a flush was forced on this memtable (either
75 : // manual, or due to ingestion). Protected by DB.mu.
76 : flushForced bool
77 : // delayedFlushForcedAt indicates whether a timer has been set to force a
78 : // flush on this memtable at some point in the future. Protected by DB.mu.
79 : // Holds the timestamp of when the flush will be issued.
80 : delayedFlushForcedAt time.Time
81 : // logNum corresponds to the WAL that contains the records present in the
82 : // receiver.
83 : logNum base.DiskFileNum
84 : // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
85 : logSize uint64
86 : // The current logSeqNum at the time the memtable was created. This is
87 : // guaranteed to be less than or equal to any seqnum stored in the memtable.
88 : logSeqNum base.SeqNum
89 : // readerRefs tracks the read references on the flushable. The two sources of
90 : // reader references are DB.mu.mem.queue and readState.memtables. The memory
91 : // reserved by the flushable in the cache is released when the reader refs
92 : // drop to zero. If the flushable is referencing sstables, then the file
93 : // refount is also decreased once the reader refs drops to 0. If the
94 : // flushable is a memTable, when the reader refs drops to zero, the writer
95 : // refs will already be zero because the memtable will have been flushed and
96 : // that only occurs once the writer refs drops to zero.
97 : readerRefs atomic.Int32
98 : // Closure to invoke to release memory accounting.
99 : releaseMemAccounting func()
100 : // unrefFiles, if not nil, should be invoked to decrease the ref count of
101 : // files which are backing the flushable.
102 : unrefFiles func(*manifest.ObsoleteFiles)
103 : // deleteFnLocked should be called if the caller is holding DB.mu.
104 : deleteFnLocked func(manifest.ObsoleteFiles)
105 : // deleteFn should be called if the caller is not holding DB.mu.
106 : deleteFn func(manifest.ObsoleteFiles)
107 : }
108 :
109 1 : func (e *flushableEntry) readerRef() {
110 1 : switch v := e.readerRefs.Add(1); {
111 0 : case v <= 1:
112 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
113 : }
114 : }
115 :
116 : // db.mu must not be held when this is called.
117 1 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
118 1 : e.readerUnrefHelper(deleteFiles, e.deleteFn)
119 1 : }
120 :
121 : // db.mu must be held when this is called.
122 1 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
123 1 : e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
124 1 : }
125 :
126 : func (e *flushableEntry) readerUnrefHelper(
127 : deleteFiles bool, deleteFn func(manifest.ObsoleteFiles),
128 1 : ) {
129 1 : switch v := e.readerRefs.Add(-1); {
130 0 : case v < 0:
131 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
132 1 : case v == 0:
133 1 : if e.releaseMemAccounting == nil {
134 0 : panic("pebble: memtable reservation already released")
135 : }
136 1 : e.releaseMemAccounting()
137 1 : e.releaseMemAccounting = nil
138 1 : if e.unrefFiles != nil {
139 1 : var obsolete manifest.ObsoleteFiles
140 1 : e.unrefFiles(&obsolete)
141 1 : e.unrefFiles = nil
142 1 : if deleteFiles {
143 1 : deleteFn(obsolete)
144 1 : }
145 : }
146 : }
147 : }
148 :
149 : type flushableList []*flushableEntry
150 :
151 : // ingestedFlushable is the implementation of the flushable interface for the
152 : // ingesting sstables which are added to the flushable list.
153 : type ingestedFlushable struct {
154 : // files are non-overlapping and ordered (according to their bounds).
155 : files []physicalMeta
156 : comparer *Comparer
157 : newIters tableNewIters
158 : newRangeKeyIters keyspanimpl.TableNewSpanIter
159 :
160 : // Since the level slice is immutable, we construct and set it once. It
161 : // should be safe to read from slice in future reads.
162 : slice manifest.LevelSlice
163 : // hasRangeKeys is set on ingestedFlushable construction.
164 : hasRangeKeys bool
165 : // exciseSpan is populated if an excise operation should be performed during
166 : // flush.
167 : exciseSpan KeyRange
168 : exciseSeqNum base.SeqNum
169 : }
170 :
171 : func newIngestedFlushable(
172 : files []*tableMetadata,
173 : comparer *Comparer,
174 : newIters tableNewIters,
175 : newRangeKeyIters keyspanimpl.TableNewSpanIter,
176 : exciseSpan KeyRange,
177 : seqNum base.SeqNum,
178 1 : ) *ingestedFlushable {
179 1 : if invariants.Enabled {
180 1 : for i := 1; i < len(files); i++ {
181 1 : prev := files[i-1].UserKeyBounds()
182 1 : this := files[i].UserKeyBounds()
183 1 : if prev.End.IsUpperBoundFor(comparer.Compare, this.Start) {
184 0 : panic(errors.AssertionFailedf("ingested flushable files overlap: %s %s", prev, this))
185 : }
186 : }
187 : }
188 1 : var physicalFiles []physicalMeta
189 1 : var hasRangeKeys bool
190 1 : for _, f := range files {
191 1 : if f.HasRangeKeys {
192 1 : hasRangeKeys = true
193 1 : }
194 1 : physicalFiles = append(physicalFiles, f.PhysicalMeta())
195 : }
196 :
197 1 : ret := &ingestedFlushable{
198 1 : files: physicalFiles,
199 1 : comparer: comparer,
200 1 : newIters: newIters,
201 1 : newRangeKeyIters: newRangeKeyIters,
202 1 : // slice is immutable and can be set once and used many times.
203 1 : slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files),
204 1 : hasRangeKeys: hasRangeKeys,
205 1 : exciseSpan: exciseSpan,
206 1 : exciseSeqNum: seqNum,
207 1 : }
208 1 :
209 1 : return ret
210 : }
211 :
212 : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
213 : // tracing.
214 :
215 : // newIter is part of the flushable interface.
216 1 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
217 1 : var opts IterOptions
218 1 : if o != nil {
219 1 : opts = *o
220 1 : }
221 1 : return newLevelIter(
222 1 : context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.FlushableIngestsLayer(),
223 1 : internalIterOpts{},
224 1 : )
225 : }
226 :
227 : // newFlushIter is part of the flushable interface.
228 0 : func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
229 0 : // newFlushIter is only used for writing memtables to disk as sstables.
230 0 : // Since ingested sstables are already present on disk, they don't need to
231 0 : // make use of a flush iter.
232 0 : panic("pebble: not implemented")
233 : }
234 :
235 : func (s *ingestedFlushable) constructRangeDelIter(
236 : ctx context.Context, file *manifest.TableMetadata, _ keyspan.SpanIterOptions,
237 1 : ) (keyspan.FragmentIterator, error) {
238 1 : iters, err := s.newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
239 1 : if err != nil {
240 0 : return nil, err
241 0 : }
242 1 : return iters.RangeDeletion(), nil
243 : }
244 :
245 : // newRangeDelIter is part of the flushable interface.
246 : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
247 : // surface range deletes is more efficient.
248 : //
249 : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
250 : // the point iterator in constructRangeDeIter is not tracked.
251 1 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
252 1 : liter := keyspanimpl.NewLevelIter(
253 1 : context.TODO(),
254 1 : keyspan.SpanIterOptions{}, s.comparer.Compare,
255 1 : s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(),
256 1 : manifest.KeyTypePoint,
257 1 : )
258 1 : if !s.exciseSpan.Valid() {
259 1 : return liter
260 1 : }
261 : // We have an excise span to weave into the rangedel iterators.
262 : //
263 : // TODO(bilal): should this be pooled?
264 1 : miter := &keyspanimpl.MergingIter{}
265 1 : rdel := keyspan.Span{
266 1 : Start: s.exciseSpan.Start,
267 1 : End: s.exciseSpan.End,
268 1 : Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeDelete)}},
269 1 : }
270 1 : rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel})
271 1 : miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter)
272 1 : return miter
273 : }
274 :
275 : // newRangeKeyIter is part of the flushable interface.
276 1 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
277 1 : var rkeydelIter keyspan.FragmentIterator
278 1 : if s.exciseSpan.Valid() {
279 1 : // We have an excise span to weave into the rangekey iterators.
280 1 : rkeydel := keyspan.Span{
281 1 : Start: s.exciseSpan.Start,
282 1 : End: s.exciseSpan.End,
283 1 : Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeKeyDelete)}},
284 1 : }
285 1 : rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel})
286 1 : }
287 :
288 1 : if !s.hasRangeKeys {
289 1 : if rkeydelIter == nil {
290 1 : // NB: we have to return the nil literal as opposed to the nil
291 1 : // value of rkeydelIter, otherwise callers of this function will
292 1 : // have the return value fail == nil checks.
293 1 : return nil
294 1 : }
295 1 : return rkeydelIter
296 : }
297 :
298 1 : liter := keyspanimpl.NewLevelIter(
299 1 : context.TODO(),
300 1 : keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
301 1 : s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange,
302 1 : )
303 1 : if rkeydelIter == nil {
304 1 : return liter
305 1 : }
306 : // TODO(bilal): should this be pooled?
307 1 : miter := &keyspanimpl.MergingIter{}
308 1 : miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rkeydelIter)
309 1 : return miter
310 : }
311 :
312 : // containsRangeKeys is part of the flushable interface.
313 1 : func (s *ingestedFlushable) containsRangeKeys() bool {
314 1 : return s.hasRangeKeys || s.exciseSpan.Valid()
315 1 : }
316 :
317 : // inuseBytes is part of the flushable interface.
318 0 : func (s *ingestedFlushable) inuseBytes() uint64 {
319 0 : // inuseBytes is only used when memtables are flushed to disk as sstables.
320 0 : panic("pebble: not implemented")
321 : }
322 :
323 : // totalBytes is part of the flushable interface.
324 1 : func (s *ingestedFlushable) totalBytes() uint64 {
325 1 : // We don't allocate additional bytes for the ingestedFlushable.
326 1 : return 0
327 1 : }
328 :
329 : // readyForFlush is part of the flushable interface.
330 1 : func (s *ingestedFlushable) readyForFlush() bool {
331 1 : // ingestedFlushable should always be ready to flush. However, note that
332 1 : // memtables before the ingested sstables in the memtable queue must be
333 1 : // flushed before an ingestedFlushable can be flushed. This is because the
334 1 : // ingested sstables need an updated view of the Version to
335 1 : // determine where to place the files in the lsm.
336 1 : return true
337 1 : }
338 :
339 : // computePossibleOverlaps is part of the flushable interface.
340 : func (s *ingestedFlushable) computePossibleOverlaps(
341 : fn func(bounded) shouldContinue, bounded ...bounded,
342 1 : ) {
343 1 : for _, b := range bounded {
344 1 : if s.anyFileOverlaps(b.UserKeyBounds()) {
345 1 : // Some file overlaps in key boundaries. The file doesn't necessarily
346 1 : // contain any keys within the key range, but we would need to perform I/O
347 1 : // to know for sure. The flushable interface dictates that we're not
348 1 : // permitted to perform I/O here, so err towards assuming overlap.
349 1 : if !fn(b) {
350 1 : return
351 1 : }
352 : }
353 : }
354 : }
355 :
356 : // anyFileBoundsOverlap returns true if there is at least a file in s.files with
357 : // bounds that overlap the given bounds.
358 1 : func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
359 1 : // Note that s.files are non-overlapping and sorted.
360 1 : for _, f := range s.files {
361 1 : fileBounds := f.UserKeyBounds()
362 1 : if !fileBounds.End.IsUpperBoundFor(s.comparer.Compare, bounds.Start) {
363 1 : // The file ends before the bounds start. Go to the next file.
364 1 : continue
365 : }
366 1 : if !bounds.End.IsUpperBoundFor(s.comparer.Compare, fileBounds.Start) {
367 1 : // The file starts after the bounds end. There is no overlap, and
368 1 : // further files will not overlap either (the files are sorted).
369 1 : break
370 : }
371 : // There is overlap. Note that UserKeyBounds.Overlaps() performs exactly the
372 : // checks above.
373 1 : return true
374 : }
375 1 : if s.exciseSpan.Valid() {
376 1 : uk := s.exciseSpan.UserKeyBounds()
377 1 : return uk.Overlaps(s.comparer.Compare, &bounds)
378 1 : }
379 1 : return false
380 : }
381 :
382 : // computePossibleOverlapsGenericImpl is an implementation of the flushable
383 : // interface's computePossibleOverlaps function for flushable implementations
384 : // with only in-memory state that do not have special requirements and should
385 : // read through the ordinary flushable iterators.
386 : //
387 : // This function must only be used with implementations that are infallible (eg,
388 : // memtable iterators) and will panic if an error is encountered.
389 : func computePossibleOverlapsGenericImpl[F flushable](
390 : f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
391 1 : ) {
392 1 : iter := f.newIter(nil)
393 1 : rangeDelIter := f.newRangeDelIter(nil)
394 1 : rangeKeyIter := f.newRangeKeyIter(nil)
395 1 : for _, b := range bounded {
396 1 : overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rangeKeyIter)
397 1 : if invariants.Enabled && err != nil {
398 0 : panic(errors.AssertionFailedf("expected iterator to be infallible: %v", err))
399 : }
400 1 : if overlap {
401 1 : if !fn(b) {
402 1 : break
403 : }
404 : }
405 : }
406 :
407 1 : if iter != nil {
408 1 : if err := iter.Close(); err != nil {
409 0 : // This implementation must be used in circumstances where
410 0 : // reading through the iterator is infallible.
411 0 : panic(err)
412 : }
413 : }
414 1 : if rangeDelIter != nil {
415 1 : rangeDelIter.Close()
416 1 : }
417 1 : if rangeKeyIter != nil {
418 1 : rangeKeyIter.Close()
419 1 : }
420 : }
421 :
422 : // determineOverlapAllIters checks for overlap in a point iterator, range
423 : // deletion iterator and range key iterator.
424 : func determineOverlapAllIters(
425 : cmp base.Compare,
426 : bounds base.UserKeyBounds,
427 : pointIter base.InternalIterator,
428 : rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
429 1 : ) (bool, error) {
430 1 : if pointIter != nil {
431 1 : if pointOverlap, err := determineOverlapPointIterator(cmp, bounds, pointIter); pointOverlap || err != nil {
432 1 : return pointOverlap, err
433 1 : }
434 : }
435 1 : if rangeDelIter != nil {
436 1 : if rangeDelOverlap, err := determineOverlapKeyspanIterator(cmp, bounds, rangeDelIter); rangeDelOverlap || err != nil {
437 1 : return rangeDelOverlap, err
438 1 : }
439 : }
440 1 : if rangeKeyIter != nil {
441 1 : return determineOverlapKeyspanIterator(cmp, bounds, rangeKeyIter)
442 1 : }
443 1 : return false, nil
444 : }
445 :
446 : func determineOverlapPointIterator(
447 : cmp base.Compare, bounds base.UserKeyBounds, iter internalIterator,
448 1 : ) (bool, error) {
449 1 : kv := iter.SeekGE(bounds.Start, base.SeekGEFlagsNone)
450 1 : if kv == nil {
451 1 : return false, iter.Error()
452 1 : }
453 1 : return bounds.End.IsUpperBoundForInternalKey(cmp, kv.K), nil
454 : }
455 :
456 : func determineOverlapKeyspanIterator(
457 : cmp base.Compare, bounds base.UserKeyBounds, iter keyspan.FragmentIterator,
458 1 : ) (bool, error) {
459 1 : // NB: The spans surfaced by the fragment iterator are non-overlapping.
460 1 : span, err := iter.SeekGE(bounds.Start)
461 1 : if err != nil {
462 0 : return false, err
463 0 : }
464 1 : for ; span != nil; span, err = iter.Next() {
465 1 : if !bounds.End.IsUpperBoundFor(cmp, span.Start) {
466 1 : // The span starts after our bounds.
467 1 : return false, nil
468 1 : }
469 1 : if !span.Empty() {
470 1 : return true, nil
471 1 : }
472 : }
473 1 : return false, err
474 : }
|