Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "io"
10 : "math"
11 : "sync"
12 : "time"
13 :
14 : "github.com/cockroachdb/pebble/internal/invariants"
15 : "github.com/cockroachdb/pebble/rangekey"
16 : "github.com/cockroachdb/pebble/sstable"
17 : )
18 :
19 : // Snapshot provides a read-only point-in-time view of the DB state.
20 : type Snapshot struct {
21 : // The db the snapshot was created from.
22 : db *DB
23 : seqNum uint64
24 :
25 : // Set if part of an EventuallyFileOnlySnapshot.
26 : efos *EventuallyFileOnlySnapshot
27 :
28 : // The list the snapshot is linked into.
29 : list *snapshotList
30 :
31 : // The next/prev link for the snapshotList doubly-linked list of snapshots.
32 : prev, next *Snapshot
33 : }
34 :
35 : var _ Reader = (*Snapshot)(nil)
36 :
37 : // Get gets the value for the given key. It returns ErrNotFound if the Snapshot
38 : // does not contain the key.
39 : //
40 : // The caller should not modify the contents of the returned slice, but it is
41 : // safe to modify the contents of the argument after Get returns. The returned
42 : // slice will remain valid until the returned Closer is closed. On success, the
43 : // caller MUST call closer.Close() or a memory leak will occur.
44 1 : func (s *Snapshot) Get(key []byte) ([]byte, io.Closer, error) {
45 1 : if s.db == nil {
46 1 : panic(ErrClosed)
47 : }
48 1 : return s.db.getInternal(key, nil /* batch */, s)
49 : }
50 :
51 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
52 : // return false). The iterator can be positioned via a call to SeekGE,
53 : // SeekLT, First or Last.
54 1 : func (s *Snapshot) NewIter(o *IterOptions) (*Iterator, error) {
55 1 : return s.NewIterWithContext(context.Background(), o)
56 1 : }
57 :
58 : // NewIterWithContext is like NewIter, and additionally accepts a context for
59 : // tracing.
60 1 : func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
61 1 : if s.db == nil {
62 1 : panic(ErrClosed)
63 : }
64 1 : return s.db.newIter(ctx, nil /* batch */, newIterOpts{
65 1 : snapshot: snapshotIterOpts{seqNum: s.seqNum},
66 1 : }, o), nil
67 : }
68 :
69 : // ScanInternal scans all internal keys within the specified bounds, truncating
70 : // any rangedels and rangekeys to those bounds. For use when an external user
71 : // needs to be aware of all internal keys that make up a key range.
72 : //
73 : // See comment on db.ScanInternal for the behaviour that can be expected of
74 : // point keys deleted by range dels and keys masked by range keys.
75 : func (s *Snapshot) ScanInternal(
76 : ctx context.Context,
77 : categoryAndQoS sstable.CategoryAndQoS,
78 : lower, upper []byte,
79 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
80 : visitRangeDel func(start, end []byte, seqNum uint64) error,
81 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
82 : visitSharedFile func(sst *SharedSSTMeta) error,
83 1 : ) error {
84 1 : if s.db == nil {
85 0 : panic(ErrClosed)
86 : }
87 1 : scanInternalOpts := &scanInternalOptions{
88 1 : CategoryAndQoS: categoryAndQoS,
89 1 : visitPointKey: visitPointKey,
90 1 : visitRangeDel: visitRangeDel,
91 1 : visitRangeKey: visitRangeKey,
92 1 : visitSharedFile: visitSharedFile,
93 1 : skipSharedLevels: visitSharedFile != nil,
94 1 : IterOptions: IterOptions{
95 1 : KeyTypes: IterKeyTypePointsAndRanges,
96 1 : LowerBound: lower,
97 1 : UpperBound: upper,
98 1 : },
99 1 : }
100 1 :
101 1 : iter, err := s.db.newInternalIter(ctx, snapshotIterOpts{seqNum: s.seqNum}, scanInternalOpts)
102 1 : if err != nil {
103 0 : return err
104 0 : }
105 1 : defer iter.close()
106 1 :
107 1 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
108 : }
109 :
110 : // closeLocked is similar to Close(), except it requires that db.mu be held
111 : // by the caller.
112 1 : func (s *Snapshot) closeLocked() error {
113 1 : s.db.mu.snapshots.remove(s)
114 1 :
115 1 : // If s was the previous earliest snapshot, we might be able to reclaim
116 1 : // disk space by dropping obsolete records that were pinned by s.
117 1 : if e := s.db.mu.snapshots.earliest(); e > s.seqNum {
118 1 : s.db.maybeScheduleCompactionPicker(pickElisionOnly)
119 1 : }
120 1 : s.db = nil
121 1 : return nil
122 : }
123 :
124 : // Close closes the snapshot, releasing its resources. Close must be called.
125 : // Failure to do so will result in a tiny memory leak and a large leak of
126 : // resources on disk due to the entries the snapshot is preventing from being
127 : // deleted.
128 : //
129 : // d.mu must NOT be held by the caller.
130 1 : func (s *Snapshot) Close() error {
131 1 : db := s.db
132 1 : if db == nil {
133 1 : panic(ErrClosed)
134 : }
135 1 : db.mu.Lock()
136 1 : defer db.mu.Unlock()
137 1 : return s.closeLocked()
138 : }
139 :
140 : type snapshotList struct {
141 : root Snapshot
142 : }
143 :
144 1 : func (l *snapshotList) init() {
145 1 : l.root.next = &l.root
146 1 : l.root.prev = &l.root
147 1 : }
148 :
149 1 : func (l *snapshotList) empty() bool {
150 1 : return l.root.next == &l.root
151 1 : }
152 :
153 1 : func (l *snapshotList) count() int {
154 1 : if l.empty() {
155 1 : return 0
156 1 : }
157 0 : var count int
158 0 : for i := l.root.next; i != &l.root; i = i.next {
159 0 : count++
160 0 : }
161 0 : return count
162 : }
163 :
164 1 : func (l *snapshotList) earliest() uint64 {
165 1 : v := uint64(math.MaxUint64)
166 1 : if !l.empty() {
167 1 : v = l.root.next.seqNum
168 1 : }
169 1 : return v
170 : }
171 :
172 1 : func (l *snapshotList) toSlice() []uint64 {
173 1 : if l.empty() {
174 1 : return nil
175 1 : }
176 1 : var results []uint64
177 1 : for i := l.root.next; i != &l.root; i = i.next {
178 1 : results = append(results, i.seqNum)
179 1 : }
180 1 : return results
181 : }
182 :
183 1 : func (l *snapshotList) pushBack(s *Snapshot) {
184 1 : if s.list != nil || s.prev != nil || s.next != nil {
185 0 : panic("pebble: snapshot list is inconsistent")
186 : }
187 1 : s.prev = l.root.prev
188 1 : s.prev.next = s
189 1 : s.next = &l.root
190 1 : s.next.prev = s
191 1 : s.list = l
192 : }
193 :
194 1 : func (l *snapshotList) remove(s *Snapshot) {
195 1 : if s == &l.root {
196 0 : panic("pebble: cannot remove snapshot list root node")
197 : }
198 1 : if s.list != l {
199 0 : panic("pebble: snapshot list is inconsistent")
200 : }
201 1 : s.prev.next = s.next
202 1 : s.next.prev = s.prev
203 1 : s.next = nil // avoid memory leaks
204 1 : s.prev = nil // avoid memory leaks
205 1 : s.list = nil // avoid memory leaks
206 : }
207 :
208 : // EventuallyFileOnlySnapshot (aka EFOS) provides a read-only point-in-time view
209 : // of the database state, similar to Snapshot. An EventuallyFileOnlySnapshot
210 : // induces less write amplification than Snapshot, at the cost of increased space
211 : // amplification. While a Snapshot may increase write amplification across all
212 : // flushes and compactions for the duration of its lifetime, an
213 : // EventuallyFileOnlySnapshot only incurs that cost for flushes/compactions if
214 : // memtables at the time of EFOS instantiation contained keys that the EFOS is
215 : // interested in (i.e. its protectedRanges). In that case, the EFOS prevents
216 : // elision of keys visible to it, similar to a Snapshot, until those memtables
217 : // are flushed, and once that happens, the "EventuallyFileOnlySnapshot"
218 : // transitions to a file-only snapshot state in which it pins zombies sstables
219 : // like an open Iterator would, without pinning any memtables. Callers that can
220 : // tolerate the increased space amplification of pinning zombie sstables until
221 : // the snapshot is closed may prefer EventuallyFileOnlySnapshots for their
222 : // reduced write amplification. Callers that desire the benefits of the file-only
223 : // state that requires no pinning of memtables should call
224 : // `WaitForFileOnlySnapshot()` before relying on the EFOS to keep producing iterators
225 : // with zero write-amp and zero pinning of memtables in memory.
226 : //
227 : // EventuallyFileOnlySnapshots interact with the IngestAndExcise operation in
228 : // subtle ways. The IngestAndExcise can force the transition of an EFOS to a
229 : // file-only snapshot if an excise overlaps with the EFOS bounds.
230 : type EventuallyFileOnlySnapshot struct {
231 : mu struct {
232 : // NB: If both this mutex and db.mu are being grabbed, db.mu should be
233 : // grabbed _before_ grabbing this one.
234 : sync.Mutex
235 :
236 : // Either the snap field is set below, or the version is set at any given
237 : // point of time. If a snapshot is referenced, this is not a file-only
238 : // snapshot yet, and if a version is set (and ref'd) this is a file-only
239 : // snapshot.
240 :
241 : // The wrapped regular snapshot, if not a file-only snapshot yet.
242 : snap *Snapshot
243 : // The wrapped version reference, if a file-only snapshot.
244 : vers *version
245 : }
246 :
247 : // Key ranges to watch for an excise on.
248 : protectedRanges []KeyRange
249 :
250 : // The db the snapshot was created from.
251 : db *DB
252 : seqNum uint64
253 : closed chan struct{}
254 : }
255 :
256 1 : func (d *DB) makeEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
257 1 : isFileOnly := true
258 1 :
259 1 : d.mu.Lock()
260 1 : defer d.mu.Unlock()
261 1 : seqNum := d.mu.versions.visibleSeqNum.Load()
262 1 : // Check if any of the keyRanges overlap with a memtable.
263 1 : for i := range d.mu.mem.queue {
264 1 : d.mu.mem.queue[i].computePossibleOverlaps(func(bounded) shouldContinue {
265 1 : isFileOnly = false
266 1 : return stopIteration
267 1 : }, sliceAsBounded(keyRanges)...)
268 : }
269 1 : es := &EventuallyFileOnlySnapshot{
270 1 : db: d,
271 1 : seqNum: seqNum,
272 1 : protectedRanges: keyRanges,
273 1 : closed: make(chan struct{}),
274 1 : }
275 1 : if isFileOnly {
276 1 : es.mu.vers = d.mu.versions.currentVersion()
277 1 : es.mu.vers.Ref()
278 1 : } else {
279 1 : s := &Snapshot{
280 1 : db: d,
281 1 : seqNum: seqNum,
282 1 : }
283 1 : s.efos = es
284 1 : es.mu.snap = s
285 1 : d.mu.snapshots.pushBack(s)
286 1 : }
287 1 : return es
288 : }
289 :
290 : // Transitions this EventuallyFileOnlySnapshot to a file-only snapshot. Requires
291 : // earliestUnflushedSeqNum and vers to correspond to the same Version from the
292 : // current or a past acquisition of db.mu. vers must have been Ref()'d before
293 : // that mutex was released, if it was released.
294 : //
295 : // NB: The caller is expected to check for es.excised before making this
296 : // call.
297 : //
298 : // d.mu must be held when calling this method.
299 1 : func (es *EventuallyFileOnlySnapshot) transitionToFileOnlySnapshot(vers *version) error {
300 1 : es.mu.Lock()
301 1 : select {
302 0 : case <-es.closed:
303 0 : vers.UnrefLocked()
304 0 : es.mu.Unlock()
305 0 : return ErrClosed
306 1 : default:
307 : }
308 1 : if es.mu.snap == nil {
309 0 : es.mu.Unlock()
310 0 : panic("pebble: tried to transition an eventually-file-only-snapshot twice")
311 : }
312 : // The caller has already called Ref() on vers.
313 1 : es.mu.vers = vers
314 1 : // NB: The callers should have already done a check of es.excised.
315 1 : oldSnap := es.mu.snap
316 1 : es.mu.snap = nil
317 1 : es.mu.Unlock()
318 1 : return oldSnap.closeLocked()
319 : }
320 :
321 : // hasTransitioned returns true if this EFOS has transitioned to a file-only
322 : // snapshot.
323 1 : func (es *EventuallyFileOnlySnapshot) hasTransitioned() bool {
324 1 : es.mu.Lock()
325 1 : defer es.mu.Unlock()
326 1 : return es.mu.vers != nil
327 1 : }
328 :
329 : // waitForFlush waits for a flush on any memtables that need to be flushed
330 : // before this EFOS can transition to a file-only snapshot. If this EFOS is
331 : // waiting on a flush of the mutable memtable, it forces a rotation within
332 : // `dur` duration. For immutable memtables, it schedules a flush and waits for
333 : // it to finish.
334 1 : func (es *EventuallyFileOnlySnapshot) waitForFlush(ctx context.Context, dur time.Duration) error {
335 1 : es.db.mu.Lock()
336 1 : defer es.db.mu.Unlock()
337 1 :
338 1 : earliestUnflushedSeqNum := es.db.getEarliestUnflushedSeqNumLocked()
339 1 : for earliestUnflushedSeqNum < es.seqNum {
340 1 : select {
341 0 : case <-es.closed:
342 0 : return ErrClosed
343 0 : case <-ctx.Done():
344 0 : return ctx.Err()
345 1 : default:
346 : }
347 : // Check if the current mutable memtable contains keys less than seqNum.
348 : // If so, rotate it.
349 1 : if es.db.mu.mem.mutable.logSeqNum < es.seqNum && dur.Nanoseconds() > 0 {
350 1 : es.db.maybeScheduleDelayedFlush(es.db.mu.mem.mutable, dur)
351 1 : } else {
352 0 : // Find the last memtable that contains seqNums less than es.seqNum,
353 0 : // and force a flush on it.
354 0 : var mem *flushableEntry
355 0 : for i := range es.db.mu.mem.queue {
356 0 : if es.db.mu.mem.queue[i].logSeqNum < es.seqNum {
357 0 : mem = es.db.mu.mem.queue[i]
358 0 : }
359 : }
360 0 : mem.flushForced = true
361 0 : es.db.maybeScheduleFlush()
362 : }
363 1 : es.db.mu.compact.cond.Wait()
364 1 :
365 1 : earliestUnflushedSeqNum = es.db.getEarliestUnflushedSeqNumLocked()
366 : }
367 1 : return nil
368 : }
369 :
370 : // WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot
371 : // has been converted into a file-only snapshot (i.e. all memtables containing
372 : // keys < seqNum are flushed). A duration can be passed in, and if nonzero,
373 : // a delayed flush will be scheduled at that duration if necessary.
374 : //
375 : // Idempotent; can be called multiple times with no side effects.
376 : func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(
377 : ctx context.Context, dur time.Duration,
378 1 : ) error {
379 1 : if es.hasTransitioned() {
380 1 : return nil
381 1 : }
382 :
383 1 : if err := es.waitForFlush(ctx, dur); err != nil {
384 0 : return err
385 0 : }
386 :
387 1 : if invariants.Enabled {
388 1 : // Since we aren't returning an error, we _must_ have transitioned to a
389 1 : // file-only snapshot by now.
390 1 : if !es.hasTransitioned() {
391 0 : panic("expected EFOS to have transitioned to file-only snapshot after flush")
392 : }
393 : }
394 1 : return nil
395 : }
396 :
397 : // Close closes the file-only snapshot and releases all referenced resources.
398 : // Not idempotent.
399 1 : func (es *EventuallyFileOnlySnapshot) Close() error {
400 1 : close(es.closed)
401 1 : es.db.mu.Lock()
402 1 : defer es.db.mu.Unlock()
403 1 : es.mu.Lock()
404 1 : defer es.mu.Unlock()
405 1 :
406 1 : if es.mu.snap != nil {
407 1 : if err := es.mu.snap.closeLocked(); err != nil {
408 0 : return err
409 0 : }
410 : }
411 1 : if es.mu.vers != nil {
412 1 : es.mu.vers.UnrefLocked()
413 1 : }
414 1 : return nil
415 : }
416 :
417 : // Get implements the Reader interface.
418 1 : func (es *EventuallyFileOnlySnapshot) Get(key []byte) (value []byte, closer io.Closer, err error) {
419 1 : // TODO(jackson): Use getInternal.
420 1 : iter, err := es.NewIter(nil)
421 1 : if err != nil {
422 0 : return nil, nil, err
423 0 : }
424 1 : var valid bool
425 1 : if es.db.opts.Comparer.Split != nil {
426 1 : valid = iter.SeekPrefixGE(key)
427 1 : } else {
428 0 : valid = iter.SeekGE(key)
429 0 : }
430 1 : if !valid {
431 1 : if err = firstError(iter.Error(), iter.Close()); err != nil {
432 1 : return nil, nil, err
433 1 : }
434 1 : return nil, nil, ErrNotFound
435 : }
436 0 : if !es.db.equal(iter.Key(), key) {
437 0 : return nil, nil, firstError(iter.Close(), ErrNotFound)
438 0 : }
439 0 : return iter.Value(), iter, nil
440 : }
441 :
442 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
443 : // return false). The iterator can be positioned via a call to SeekGE,
444 : // SeekLT, First or Last.
445 1 : func (es *EventuallyFileOnlySnapshot) NewIter(o *IterOptions) (*Iterator, error) {
446 1 : return es.NewIterWithContext(context.Background(), o)
447 1 : }
448 :
449 : // NewIterWithContext is like NewIter, and additionally accepts a context for
450 : // tracing.
451 : func (es *EventuallyFileOnlySnapshot) NewIterWithContext(
452 : ctx context.Context, o *IterOptions,
453 1 : ) (*Iterator, error) {
454 1 : select {
455 0 : case <-es.closed:
456 0 : panic(ErrClosed)
457 1 : default:
458 : }
459 :
460 1 : es.mu.Lock()
461 1 : defer es.mu.Unlock()
462 1 : if es.mu.vers != nil {
463 1 : sOpts := snapshotIterOpts{seqNum: es.seqNum, vers: es.mu.vers}
464 1 : return es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o), nil
465 1 : }
466 :
467 1 : sOpts := snapshotIterOpts{seqNum: es.seqNum}
468 1 : iter := es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o)
469 1 : return iter, nil
470 : }
471 :
472 : // ScanInternal scans all internal keys within the specified bounds, truncating
473 : // any rangedels and rangekeys to those bounds. For use when an external user
474 : // needs to be aware of all internal keys that make up a key range.
475 : //
476 : // See comment on db.ScanInternal for the behaviour that can be expected of
477 : // point keys deleted by range dels and keys masked by range keys.
478 : func (es *EventuallyFileOnlySnapshot) ScanInternal(
479 : ctx context.Context,
480 : categoryAndQoS sstable.CategoryAndQoS,
481 : lower, upper []byte,
482 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
483 : visitRangeDel func(start, end []byte, seqNum uint64) error,
484 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
485 : visitSharedFile func(sst *SharedSSTMeta) error,
486 1 : ) error {
487 1 : if es.db == nil {
488 0 : panic(ErrClosed)
489 : }
490 1 : var sOpts snapshotIterOpts
491 1 : opts := &scanInternalOptions{
492 1 : CategoryAndQoS: categoryAndQoS,
493 1 : IterOptions: IterOptions{
494 1 : KeyTypes: IterKeyTypePointsAndRanges,
495 1 : LowerBound: lower,
496 1 : UpperBound: upper,
497 1 : },
498 1 : visitPointKey: visitPointKey,
499 1 : visitRangeDel: visitRangeDel,
500 1 : visitRangeKey: visitRangeKey,
501 1 : visitSharedFile: visitSharedFile,
502 1 : skipSharedLevels: visitSharedFile != nil,
503 1 : }
504 1 : es.mu.Lock()
505 1 : if es.mu.vers != nil {
506 1 : sOpts = snapshotIterOpts{
507 1 : seqNum: es.seqNum,
508 1 : vers: es.mu.vers,
509 1 : }
510 1 : } else {
511 1 : sOpts = snapshotIterOpts{
512 1 : seqNum: es.seqNum,
513 1 : }
514 1 : }
515 1 : es.mu.Unlock()
516 1 : iter, err := es.db.newInternalIter(ctx, sOpts, opts)
517 1 : if err != nil {
518 0 : return err
519 0 : }
520 1 : defer iter.close()
521 1 :
522 1 : return scanInternalImpl(ctx, lower, upper, iter, opts)
523 : }
|