Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "io"
10 : "math"
11 : "sync"
12 : "time"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/internal/manifest"
18 : )
19 :
20 : // Snapshot provides a read-only point-in-time view of the DB state.
21 : type Snapshot struct {
22 : // The db the snapshot was created from.
23 : db *DB
24 : seqNum base.SeqNum
25 :
26 : // Set if part of an EventuallyFileOnlySnapshot.
27 : efos *EventuallyFileOnlySnapshot
28 :
29 : // The list the snapshot is linked into.
30 : list *snapshotList
31 :
32 : // The next/prev link for the snapshotList doubly-linked list of snapshots.
33 : prev, next *Snapshot
34 : }
35 :
36 : var _ Reader = (*Snapshot)(nil)
37 :
38 : // Get gets the value for the given key. It returns ErrNotFound if the Snapshot
39 : // does not contain the key.
40 : //
41 : // The caller should not modify the contents of the returned slice, but it is
42 : // safe to modify the contents of the argument after Get returns. The returned
43 : // slice will remain valid until the returned Closer is closed. On success, the
44 : // caller MUST call closer.Close() or a memory leak will occur.
45 1 : func (s *Snapshot) Get(key []byte) ([]byte, io.Closer, error) {
46 1 : if s.db == nil {
47 0 : panic(ErrClosed)
48 : }
49 1 : return s.db.getInternal(key, nil /* batch */, s)
50 : }
51 :
52 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
53 : // return false). The iterator can be positioned via a call to SeekGE,
54 : // SeekLT, First or Last.
55 1 : func (s *Snapshot) NewIter(o *IterOptions) (*Iterator, error) {
56 1 : return s.NewIterWithContext(context.Background(), o)
57 1 : }
58 :
59 : // NewIterWithContext is like NewIter, and additionally accepts a context for
60 : // tracing.
61 1 : func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
62 1 : if s.db == nil {
63 0 : panic(ErrClosed)
64 : }
65 1 : return s.db.newIter(ctx, nil /* batch */, newIterOpts{
66 1 : snapshot: snapshotIterOpts{seqNum: s.seqNum},
67 1 : }, o), nil
68 : }
69 :
70 : // ScanInternal scans all internal keys within the specified bounds, truncating
71 : // any rangedels and rangekeys to those bounds. For use when an external user
72 : // needs to be aware of all internal keys that make up a key range.
73 : //
74 : // See comment on db.ScanInternal for the behaviour that can be expected of
75 : // point keys deleted by range dels and keys masked by range keys.
76 0 : func (s *Snapshot) ScanInternal(ctx context.Context, opts ScanInternalOptions) (err error) {
77 0 : if s.db == nil {
78 0 : panic(ErrClosed)
79 : }
80 0 : var iter *scanInternalIterator
81 0 : iter, err = s.db.newInternalIter(ctx, snapshotIterOpts{seqNum: s.seqNum}, &opts)
82 0 : if err != nil {
83 0 : return err
84 0 : }
85 0 : defer func() { err = errors.CombineErrors(err, iter.Close()) }()
86 0 : return scanInternalImpl(ctx, iter, &opts)
87 : }
88 :
89 : // closeLocked is similar to Close(), except it requires that db.mu be held
90 : // by the caller.
91 1 : func (s *Snapshot) closeLocked() error {
92 1 : s.db.mu.snapshots.remove(s)
93 1 :
94 1 : // If s was the previous earliest snapshot, we might be able to reclaim
95 1 : // disk space by dropping obsolete records that were pinned by s.
96 1 : if e := s.db.mu.snapshots.earliest(); e > s.seqNum {
97 1 : // NB: maybeScheduleCompaction also picks elision-only compactions.
98 1 : s.db.maybeScheduleCompaction()
99 1 : }
100 1 : s.db = nil
101 1 : return nil
102 : }
103 :
104 : // Close closes the snapshot, releasing its resources. Close must be called.
105 : // Failure to do so will result in a tiny memory leak and a large leak of
106 : // resources on disk due to the entries the snapshot is preventing from being
107 : // deleted.
108 : //
109 : // d.mu must NOT be held by the caller.
110 1 : func (s *Snapshot) Close() error {
111 1 : db := s.db
112 1 : if db == nil {
113 0 : panic(ErrClosed)
114 : }
115 1 : db.mu.Lock()
116 1 : defer db.mu.Unlock()
117 1 : return s.closeLocked()
118 : }
119 :
120 : type snapshotList struct {
121 : root Snapshot
122 : }
123 :
124 1 : func (l *snapshotList) init() {
125 1 : l.root.next = &l.root
126 1 : l.root.prev = &l.root
127 1 : }
128 :
129 1 : func (l *snapshotList) empty() bool {
130 1 : return l.root.next == &l.root
131 1 : }
132 :
133 1 : func (l *snapshotList) count() int {
134 1 : if l.empty() {
135 1 : return 0
136 1 : }
137 0 : var count int
138 0 : for i := l.root.next; i != &l.root; i = i.next {
139 0 : count++
140 0 : }
141 0 : return count
142 : }
143 :
144 1 : func (l *snapshotList) earliest() base.SeqNum {
145 1 : v := base.SeqNum(math.MaxUint64)
146 1 : if !l.empty() {
147 1 : v = l.root.next.seqNum
148 1 : }
149 1 : return v
150 : }
151 :
152 1 : func (l *snapshotList) toSlice() []base.SeqNum {
153 1 : if l.empty() {
154 1 : return nil
155 1 : }
156 1 : var results []base.SeqNum
157 1 : for i := l.root.next; i != &l.root; i = i.next {
158 1 : results = append(results, i.seqNum)
159 1 : }
160 1 : return results
161 : }
162 :
163 1 : func (l *snapshotList) pushBack(s *Snapshot) {
164 1 : if s.list != nil || s.prev != nil || s.next != nil {
165 0 : panic("pebble: snapshot list is inconsistent")
166 : }
167 1 : s.prev = l.root.prev
168 1 : s.prev.next = s
169 1 : s.next = &l.root
170 1 : s.next.prev = s
171 1 : s.list = l
172 : }
173 :
174 1 : func (l *snapshotList) remove(s *Snapshot) {
175 1 : if s == &l.root {
176 0 : panic("pebble: cannot remove snapshot list root node")
177 : }
178 1 : if s.list != l {
179 0 : panic("pebble: snapshot list is inconsistent")
180 : }
181 1 : s.prev.next = s.next
182 1 : s.next.prev = s.prev
183 1 : s.next = nil // avoid memory leaks
184 1 : s.prev = nil // avoid memory leaks
185 1 : s.list = nil // avoid memory leaks
186 : }
187 :
188 : // EventuallyFileOnlySnapshot (aka EFOS) provides a read-only point-in-time view
189 : // of the database state, similar to Snapshot. An EventuallyFileOnlySnapshot
190 : // induces less write amplification than Snapshot, at the cost of increased space
191 : // amplification. While a Snapshot may increase write amplification across all
192 : // flushes and compactions for the duration of its lifetime, an
193 : // EventuallyFileOnlySnapshot only incurs that cost for flushes/compactions if
194 : // memtables at the time of EFOS instantiation contained keys that the EFOS is
195 : // interested in (i.e. its protectedRanges). In that case, the EFOS prevents
196 : // elision of keys visible to it, similar to a Snapshot, until those memtables
197 : // are flushed, and once that happens, the "EventuallyFileOnlySnapshot"
198 : // transitions to a file-only snapshot state in which it pins zombies sstables
199 : // like an open Iterator would, without pinning any memtables. Callers that can
200 : // tolerate the increased space amplification of pinning zombie sstables until
201 : // the snapshot is closed may prefer EventuallyFileOnlySnapshots for their
202 : // reduced write amplification. Callers that desire the benefits of the file-only
203 : // state that requires no pinning of memtables should call
204 : // `WaitForFileOnlySnapshot()` before relying on the EFOS to keep producing iterators
205 : // with zero write-amp and zero pinning of memtables in memory.
206 : //
207 : // EventuallyFileOnlySnapshots interact with the IngestAndExcise operation in
208 : // subtle ways. The IngestAndExcise can force the transition of an EFOS to a
209 : // file-only snapshot if an excise overlaps with the EFOS bounds.
210 : type EventuallyFileOnlySnapshot struct {
211 : mu struct {
212 : // NB: If both this mutex and db.mu are being grabbed, db.mu should be
213 : // grabbed _before_ grabbing this one.
214 : sync.Mutex
215 :
216 : // Either the snap field is set below, or the version is set at any given
217 : // point of time. If a snapshot is referenced, this is not a file-only
218 : // snapshot yet, and if a version is set (and ref'd) this is a file-only
219 : // snapshot.
220 :
221 : // The wrapped regular snapshot, if not a file-only snapshot yet.
222 : snap *Snapshot
223 : // The wrapped version reference, if a file-only snapshot.
224 : vers *manifest.Version
225 : }
226 :
227 : // Key ranges to watch for an excise on.
228 : protectedRanges []KeyRange
229 :
230 : // The db the snapshot was created from.
231 : db *DB
232 : seqNum base.SeqNum
233 : closed chan struct{}
234 : }
235 :
236 1 : func (d *DB) makeEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
237 1 : var snapshotSeqNum, exciseSeqNumToWaitFor base.SeqNum
238 1 : // tryGetSnapshotSeqNum attempts to initialize a snapshotSeqNum. The
239 1 : // snapshotSeqNum should be ignored if exciseSeqNumToWaitFor is > 0. In this
240 1 : // case the caller should wait for this seqnum to become visible and call
241 1 : // this function again.
242 1 : tryGetSnapshotSeqNum := func() {
243 1 : // In AllocateSeqNum, for an ingest-and-excise, some seqnums are
244 1 : // allocated, say starting at N. Then AllocateSeqNum calls prepare, which
245 1 : // acquires DB.mu and grabs all the existing EFOS that have not
246 1 : // transitioned to FOS (which are in d.mu.snapshots.snapshotList) and
247 1 : // overlap with the excise. After releasing DB.mu, in apply, the
248 1 : // ingest-and-excise waits for all the previous EFOSs to transition to FOS
249 1 : // (as a side effect of waiting for a memtable flush to complete). Note,
250 1 : // the visible seqnum is still <= N-1, and will not be bumped to N until
251 1 : // the ingest-and-excise completes. Any EFOS that gets created after
252 1 : // prepare looked at the existing EFOSs, but before the ingest-and-excise
253 1 : // completes, will be created with EventuallyFileOnlySnapshot.seqNum=N-1,
254 1 : // and may not transition to FOS until after the excise, which is
255 1 : // incorrect (the version at the transition to FOS has already experienced
256 1 : // the excise). To avoid this incorrectness, tryGetSnapshotSeqNum is
257 1 : // called in a loop, until there are no such ongoing excises. The loop can
258 1 : // starve EFOS creation if the keyRanges keep overlapping with new ongoing
259 1 : // ingest-and-excises. So EFOS should only be used when ingest-and-excises
260 1 : // are rare over the keyRanges.
261 1 : //
262 1 : // Improving this starvation behavior would require this snapshot to
263 1 : // register itself in a way that blocks future ingest-and-excises. The
264 1 : // blocking would need to be done before allocating the seqnum, since
265 1 : // blocking after can delay (latency sensitive) writes that get a seqnum
266 1 : // later than the ingest-and-excise. Then the problem shifts to not
267 1 : // starving the ingest-and-excise if EFOS creation is frequent. We observe
268 1 : // that in the CockroachDB use case (a) ingest-and-excises are not very
269 1 : // frequent, so EFOS starvation is unlikely, (b) EFOS creation is not
270 1 : // latency sensitive. Hence, we ignore this starvation problem.
271 1 : snapshotSeqNum = d.mu.versions.visibleSeqNum.Load()
272 1 : // Check if any of the keyRanges overlap with an ongoing
273 1 : // ingest-and-excise.
274 1 : //
275 1 : // NB: The zero seqnum cannot occur in practice, since base.SeqNumStart >
276 1 : // 0.
277 1 : exciseSeqNumToWaitFor = base.SeqNum(0)
278 1 : for seqNum, span := range d.mu.snapshots.ongoingExcises {
279 0 : if base.Visible(seqNum, snapshotSeqNum, base.SeqNumMax) {
280 0 : // Skip this excise, since this is visible to the snapshot.
281 0 : continue
282 : }
283 : // INVARIANT: seqNum >= snapshotSeqNum.
284 0 : if seqNum <= exciseSeqNumToWaitFor {
285 0 : // We are already waiting for a later excise.
286 0 : continue
287 : }
288 0 : for i := range keyRanges {
289 0 : if keyRanges[i].OverlapsKeyRange(d.cmp, span) {
290 0 : exciseSeqNumToWaitFor = seqNum
291 0 : break
292 : }
293 : }
294 : }
295 : }
296 1 : d.mu.Lock()
297 1 : defer d.mu.Unlock()
298 1 : for {
299 1 : // This call updates snapshotSeqNum and exciseSeqNumToWaitFor.
300 1 : tryGetSnapshotSeqNum()
301 1 : if exciseSeqNumToWaitFor == 0 {
302 1 : break
303 : }
304 0 : for !base.Visible(exciseSeqNumToWaitFor, d.mu.versions.visibleSeqNum.Load(), base.SeqNumMax) {
305 0 : d.mu.snapshots.ongoingExcisesRemovedCond.Wait()
306 0 : }
307 : }
308 1 : isFileOnly := true
309 1 : // Check if any of the keyRanges overlap with a memtable. It is possible
310 1 : // (with very low probability) that all these memtable have seqnums later
311 1 : // than seqNum, and the overlap is a false positive. This is harmless, and
312 1 : // this EFOS will transition to FOS, when the false positive memtable is
313 1 : // flushed.
314 1 : for i := range d.mu.mem.queue {
315 1 : d.mu.mem.queue[i].computePossibleOverlaps(func(bounded) shouldContinue {
316 1 : isFileOnly = false
317 1 : return stopIteration
318 1 : }, sliceAsBounded(keyRanges)...)
319 : }
320 1 : es := &EventuallyFileOnlySnapshot{
321 1 : db: d,
322 1 : seqNum: snapshotSeqNum,
323 1 : protectedRanges: keyRanges,
324 1 : closed: make(chan struct{}),
325 1 : }
326 1 : if isFileOnly {
327 1 : es.mu.vers = d.mu.versions.currentVersion()
328 1 : es.mu.vers.Ref()
329 1 : } else {
330 1 : s := &Snapshot{
331 1 : db: d,
332 1 : seqNum: snapshotSeqNum,
333 1 : }
334 1 : s.efos = es
335 1 : es.mu.snap = s
336 1 : d.mu.snapshots.pushBack(s)
337 1 : }
338 1 : return es
339 : }
340 :
341 : // Transitions this EventuallyFileOnlySnapshot to a file-only snapshot. Requires
342 : // earliestUnflushedSeqNum and vers to correspond to the same Version from the
343 : // current or a past acquisition of db.mu. vers must have been Ref()'d before
344 : // that mutex was released, if it was released.
345 : //
346 : // NB: The caller is expected to check for es.excised before making this
347 : // call.
348 : //
349 : // d.mu must be held when calling this method.
350 1 : func (es *EventuallyFileOnlySnapshot) transitionToFileOnlySnapshot(vers *manifest.Version) error {
351 1 : es.mu.Lock()
352 1 : select {
353 1 : case <-es.closed:
354 1 : vers.UnrefLocked()
355 1 : es.mu.Unlock()
356 1 : return ErrClosed
357 1 : default:
358 : }
359 1 : if es.mu.snap == nil {
360 0 : es.mu.Unlock()
361 0 : panic("pebble: tried to transition an eventually-file-only-snapshot twice")
362 : }
363 : // The caller has already called Ref() on vers.
364 1 : es.mu.vers = vers
365 1 : // NB: The callers should have already done a check of es.excised.
366 1 : oldSnap := es.mu.snap
367 1 : es.mu.snap = nil
368 1 : es.mu.Unlock()
369 1 : return oldSnap.closeLocked()
370 : }
371 :
372 : // hasTransitioned returns true if this EFOS has transitioned to a file-only
373 : // snapshot.
374 1 : func (es *EventuallyFileOnlySnapshot) hasTransitioned() bool {
375 1 : es.mu.Lock()
376 1 : defer es.mu.Unlock()
377 1 : return es.mu.vers != nil
378 1 : }
379 :
380 : // waitForFlush waits for a flush on any memtables that need to be flushed
381 : // before this EFOS can transition to a file-only snapshot. If this EFOS is
382 : // waiting on a flush of the mutable memtable, it forces a rotation within
383 : // `dur` duration. For immutable memtables, it schedules a flush and waits for
384 : // it to finish.
385 0 : func (es *EventuallyFileOnlySnapshot) waitForFlush(ctx context.Context, dur time.Duration) error {
386 0 : es.db.mu.Lock()
387 0 : defer es.db.mu.Unlock()
388 0 :
389 0 : earliestUnflushedSeqNum := es.db.getEarliestUnflushedSeqNumLocked()
390 0 : for earliestUnflushedSeqNum < es.seqNum {
391 0 : select {
392 0 : case <-es.closed:
393 0 : return ErrClosed
394 0 : case <-ctx.Done():
395 0 : return ctx.Err()
396 0 : default:
397 : }
398 : // Check if the current mutable memtable contains keys less than seqNum.
399 : // If so, rotate it.
400 0 : if es.db.mu.mem.mutable.logSeqNum < es.seqNum && dur.Nanoseconds() > 0 {
401 0 : es.db.maybeScheduleDelayedFlush(es.db.mu.mem.mutable, dur)
402 0 : } else {
403 0 : // Find the last memtable that contains seqNums less than es.seqNum,
404 0 : // and force a flush on it.
405 0 : var mem *flushableEntry
406 0 : for i := range es.db.mu.mem.queue {
407 0 : if es.db.mu.mem.queue[i].logSeqNum < es.seqNum {
408 0 : mem = es.db.mu.mem.queue[i]
409 0 : }
410 : }
411 0 : mem.flushForced = true
412 0 : es.db.maybeScheduleFlush()
413 : }
414 0 : es.db.mu.compact.cond.Wait()
415 0 :
416 0 : earliestUnflushedSeqNum = es.db.getEarliestUnflushedSeqNumLocked()
417 : }
418 0 : return nil
419 : }
420 :
421 : // WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot
422 : // has been converted into a file-only snapshot (i.e. all memtables containing
423 : // keys < seqNum are flushed). A duration can be passed in, and if nonzero,
424 : // a delayed flush will be scheduled at that duration if necessary.
425 : //
426 : // Idempotent; can be called multiple times with no side effects.
427 : func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(
428 : ctx context.Context, dur time.Duration,
429 0 : ) error {
430 0 : if es.hasTransitioned() {
431 0 : return nil
432 0 : }
433 :
434 0 : if err := es.waitForFlush(ctx, dur); err != nil {
435 0 : return err
436 0 : }
437 :
438 0 : if invariants.Enabled {
439 0 : // Since we aren't returning an error, we _must_ have transitioned to a
440 0 : // file-only snapshot by now.
441 0 : if !es.hasTransitioned() {
442 0 : panic("expected EFOS to have transitioned to file-only snapshot after flush")
443 : }
444 : }
445 0 : return nil
446 : }
447 :
448 : // Close closes the file-only snapshot and releases all referenced resources.
449 : // Not idempotent.
450 1 : func (es *EventuallyFileOnlySnapshot) Close() error {
451 1 : close(es.closed)
452 1 : es.db.mu.Lock()
453 1 : defer es.db.mu.Unlock()
454 1 : es.mu.Lock()
455 1 : defer es.mu.Unlock()
456 1 :
457 1 : if es.mu.snap != nil {
458 1 : if err := es.mu.snap.closeLocked(); err != nil {
459 0 : return err
460 0 : }
461 : }
462 1 : if es.mu.vers != nil {
463 1 : es.mu.vers.UnrefLocked()
464 1 : }
465 1 : return nil
466 : }
467 :
468 : // Get implements the Reader interface.
469 1 : func (es *EventuallyFileOnlySnapshot) Get(key []byte) (value []byte, closer io.Closer, err error) {
470 1 : // TODO(jackson): Use getInternal.
471 1 : iter, err := es.NewIter(nil)
472 1 : if err != nil {
473 0 : return nil, nil, err
474 0 : }
475 1 : if !iter.SeekPrefixGE(key) {
476 1 : if err = firstError(iter.Error(), iter.Close()); err != nil {
477 0 : return nil, nil, err
478 0 : }
479 1 : return nil, nil, ErrNotFound
480 : }
481 1 : if !es.db.equal(iter.Key(), key) {
482 1 : return nil, nil, firstError(iter.Close(), ErrNotFound)
483 1 : }
484 1 : return iter.Value(), iter, nil
485 : }
486 :
487 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
488 : // return false). The iterator can be positioned via a call to SeekGE,
489 : // SeekLT, First or Last.
490 1 : func (es *EventuallyFileOnlySnapshot) NewIter(o *IterOptions) (*Iterator, error) {
491 1 : return es.NewIterWithContext(context.Background(), o)
492 1 : }
493 :
494 : // NewIterWithContext is like NewIter, and additionally accepts a context for
495 : // tracing.
496 : func (es *EventuallyFileOnlySnapshot) NewIterWithContext(
497 : ctx context.Context, o *IterOptions,
498 1 : ) (*Iterator, error) {
499 1 : select {
500 0 : case <-es.closed:
501 0 : panic(ErrClosed)
502 1 : default:
503 : }
504 :
505 1 : es.mu.Lock()
506 1 : defer es.mu.Unlock()
507 1 : if es.mu.vers != nil {
508 1 : sOpts := snapshotIterOpts{seqNum: es.seqNum, vers: es.mu.vers}
509 1 : return es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o), nil
510 1 : }
511 :
512 1 : sOpts := snapshotIterOpts{seqNum: es.seqNum}
513 1 : iter := es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o)
514 1 : return iter, nil
515 : }
516 :
517 : // ScanInternal scans all internal keys within the specified bounds, truncating
518 : // any rangedels and rangekeys to those bounds. For use when an external user
519 : // needs to be aware of all internal keys that make up a key range.
520 : //
521 : // See comment on db.ScanInternal for the behaviour that can be expected of
522 : // point keys deleted by range dels and keys masked by range keys.
523 : func (es *EventuallyFileOnlySnapshot) ScanInternal(
524 : ctx context.Context, opts ScanInternalOptions,
525 0 : ) (err error) {
526 0 : if es.db == nil {
527 0 : panic(ErrClosed)
528 : }
529 0 : var sOpts snapshotIterOpts
530 0 :
531 0 : es.mu.Lock()
532 0 : if es.mu.vers != nil {
533 0 : sOpts = snapshotIterOpts{
534 0 : seqNum: es.seqNum,
535 0 : vers: es.mu.vers,
536 0 : }
537 0 : } else {
538 0 : sOpts = snapshotIterOpts{
539 0 : seqNum: es.seqNum,
540 0 : }
541 0 : }
542 0 : es.mu.Unlock()
543 0 : var iter *scanInternalIterator
544 0 : iter, err = es.db.newInternalIter(ctx, sOpts, &opts)
545 0 : if err != nil {
546 0 : return err
547 0 : }
548 0 : defer func() { err = errors.CombineErrors(err, iter.Close()) }()
549 0 : return scanInternalImpl(ctx, iter, &opts)
550 : }
|