Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "io"
10 : "math"
11 : "sync"
12 : "time"
13 :
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/rangekey"
17 : "github.com/cockroachdb/pebble/sstable/block"
18 : )
19 :
20 : // Snapshot provides a read-only point-in-time view of the DB state.
21 : type Snapshot struct {
22 : // The db the snapshot was created from.
23 : db *DB
24 : seqNum base.SeqNum
25 :
26 : // Set if part of an EventuallyFileOnlySnapshot.
27 : efos *EventuallyFileOnlySnapshot
28 :
29 : // The list the snapshot is linked into.
30 : list *snapshotList
31 :
32 : // The next/prev link for the snapshotList doubly-linked list of snapshots.
33 : prev, next *Snapshot
34 : }
35 :
36 : var _ Reader = (*Snapshot)(nil)
37 :
38 : // Get gets the value for the given key. It returns ErrNotFound if the Snapshot
39 : // does not contain the key.
40 : //
41 : // The caller should not modify the contents of the returned slice, but it is
42 : // safe to modify the contents of the argument after Get returns. The returned
43 : // slice will remain valid until the returned Closer is closed. On success, the
44 : // caller MUST call closer.Close() or a memory leak will occur.
45 2 : func (s *Snapshot) Get(key []byte) ([]byte, io.Closer, error) {
46 2 : if s.db == nil {
47 1 : panic(ErrClosed)
48 : }
49 2 : return s.db.getInternal(key, nil /* batch */, s)
50 : }
51 :
52 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
53 : // return false). The iterator can be positioned via a call to SeekGE,
54 : // SeekLT, First or Last.
55 2 : func (s *Snapshot) NewIter(o *IterOptions) (*Iterator, error) {
56 2 : return s.NewIterWithContext(context.Background(), o)
57 2 : }
58 :
59 : // NewIterWithContext is like NewIter, and additionally accepts a context for
60 : // tracing.
61 2 : func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
62 2 : if s.db == nil {
63 1 : panic(ErrClosed)
64 : }
65 2 : return s.db.newIter(ctx, nil /* batch */, newIterOpts{
66 2 : snapshot: snapshotIterOpts{seqNum: s.seqNum},
67 2 : }, o), nil
68 : }
69 :
70 : // ScanInternal scans all internal keys within the specified bounds, truncating
71 : // any rangedels and rangekeys to those bounds. For use when an external user
72 : // needs to be aware of all internal keys that make up a key range.
73 : //
74 : // See comment on db.ScanInternal for the behaviour that can be expected of
75 : // point keys deleted by range dels and keys masked by range keys.
76 : func (s *Snapshot) ScanInternal(
77 : ctx context.Context,
78 : category block.Category,
79 : lower, upper []byte,
80 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
81 : visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
82 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
83 : visitSharedFile func(sst *SharedSSTMeta) error,
84 : visitExternalFile func(sst *ExternalFile) error,
85 1 : ) error {
86 1 : if s.db == nil {
87 0 : panic(ErrClosed)
88 : }
89 1 : scanInternalOpts := &scanInternalOptions{
90 1 : category: category,
91 1 : visitPointKey: visitPointKey,
92 1 : visitRangeDel: visitRangeDel,
93 1 : visitRangeKey: visitRangeKey,
94 1 : visitSharedFile: visitSharedFile,
95 1 : visitExternalFile: visitExternalFile,
96 1 : IterOptions: IterOptions{
97 1 : KeyTypes: IterKeyTypePointsAndRanges,
98 1 : LowerBound: lower,
99 1 : UpperBound: upper,
100 1 : },
101 1 : }
102 1 :
103 1 : iter, err := s.db.newInternalIter(ctx, snapshotIterOpts{seqNum: s.seqNum}, scanInternalOpts)
104 1 : if err != nil {
105 0 : return err
106 0 : }
107 1 : defer iter.close()
108 1 :
109 1 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
110 : }
111 :
112 : // closeLocked is similar to Close(), except it requires that db.mu be held
113 : // by the caller.
114 2 : func (s *Snapshot) closeLocked() error {
115 2 : s.db.mu.snapshots.remove(s)
116 2 :
117 2 : // If s was the previous earliest snapshot, we might be able to reclaim
118 2 : // disk space by dropping obsolete records that were pinned by s.
119 2 : if e := s.db.mu.snapshots.earliest(); e > s.seqNum {
120 2 : // NB: maybeScheduleCompaction also picks elision-only compactions.
121 2 : s.db.maybeScheduleCompaction()
122 2 : }
123 2 : s.db = nil
124 2 : return nil
125 : }
126 :
127 : // Close closes the snapshot, releasing its resources. Close must be called.
128 : // Failure to do so will result in a tiny memory leak and a large leak of
129 : // resources on disk due to the entries the snapshot is preventing from being
130 : // deleted.
131 : //
132 : // d.mu must NOT be held by the caller.
133 2 : func (s *Snapshot) Close() error {
134 2 : db := s.db
135 2 : if db == nil {
136 1 : panic(ErrClosed)
137 : }
138 2 : db.mu.Lock()
139 2 : defer db.mu.Unlock()
140 2 : return s.closeLocked()
141 : }
142 :
143 : type snapshotList struct {
144 : root Snapshot
145 : }
146 :
147 2 : func (l *snapshotList) init() {
148 2 : l.root.next = &l.root
149 2 : l.root.prev = &l.root
150 2 : }
151 :
152 2 : func (l *snapshotList) empty() bool {
153 2 : return l.root.next == &l.root
154 2 : }
155 :
156 2 : func (l *snapshotList) count() int {
157 2 : if l.empty() {
158 2 : return 0
159 2 : }
160 0 : var count int
161 0 : for i := l.root.next; i != &l.root; i = i.next {
162 0 : count++
163 0 : }
164 0 : return count
165 : }
166 :
167 2 : func (l *snapshotList) earliest() base.SeqNum {
168 2 : v := base.SeqNum(math.MaxUint64)
169 2 : if !l.empty() {
170 2 : v = l.root.next.seqNum
171 2 : }
172 2 : return v
173 : }
174 :
175 2 : func (l *snapshotList) toSlice() []base.SeqNum {
176 2 : if l.empty() {
177 2 : return nil
178 2 : }
179 2 : var results []base.SeqNum
180 2 : for i := l.root.next; i != &l.root; i = i.next {
181 2 : results = append(results, i.seqNum)
182 2 : }
183 2 : return results
184 : }
185 :
186 2 : func (l *snapshotList) pushBack(s *Snapshot) {
187 2 : if s.list != nil || s.prev != nil || s.next != nil {
188 0 : panic("pebble: snapshot list is inconsistent")
189 : }
190 2 : s.prev = l.root.prev
191 2 : s.prev.next = s
192 2 : s.next = &l.root
193 2 : s.next.prev = s
194 2 : s.list = l
195 : }
196 :
197 2 : func (l *snapshotList) remove(s *Snapshot) {
198 2 : if s == &l.root {
199 0 : panic("pebble: cannot remove snapshot list root node")
200 : }
201 2 : if s.list != l {
202 0 : panic("pebble: snapshot list is inconsistent")
203 : }
204 2 : s.prev.next = s.next
205 2 : s.next.prev = s.prev
206 2 : s.next = nil // avoid memory leaks
207 2 : s.prev = nil // avoid memory leaks
208 2 : s.list = nil // avoid memory leaks
209 : }
210 :
211 : // EventuallyFileOnlySnapshot (aka EFOS) provides a read-only point-in-time view
212 : // of the database state, similar to Snapshot. An EventuallyFileOnlySnapshot
213 : // induces less write amplification than Snapshot, at the cost of increased space
214 : // amplification. While a Snapshot may increase write amplification across all
215 : // flushes and compactions for the duration of its lifetime, an
216 : // EventuallyFileOnlySnapshot only incurs that cost for flushes/compactions if
217 : // memtables at the time of EFOS instantiation contained keys that the EFOS is
218 : // interested in (i.e. its protectedRanges). In that case, the EFOS prevents
219 : // elision of keys visible to it, similar to a Snapshot, until those memtables
220 : // are flushed, and once that happens, the "EventuallyFileOnlySnapshot"
221 : // transitions to a file-only snapshot state in which it pins zombies sstables
222 : // like an open Iterator would, without pinning any memtables. Callers that can
223 : // tolerate the increased space amplification of pinning zombie sstables until
224 : // the snapshot is closed may prefer EventuallyFileOnlySnapshots for their
225 : // reduced write amplification. Callers that desire the benefits of the file-only
226 : // state that requires no pinning of memtables should call
227 : // `WaitForFileOnlySnapshot()` before relying on the EFOS to keep producing iterators
228 : // with zero write-amp and zero pinning of memtables in memory.
229 : //
230 : // EventuallyFileOnlySnapshots interact with the IngestAndExcise operation in
231 : // subtle ways. The IngestAndExcise can force the transition of an EFOS to a
232 : // file-only snapshot if an excise overlaps with the EFOS bounds.
233 : type EventuallyFileOnlySnapshot struct {
234 : mu struct {
235 : // NB: If both this mutex and db.mu are being grabbed, db.mu should be
236 : // grabbed _before_ grabbing this one.
237 : sync.Mutex
238 :
239 : // Either the snap field is set below, or the version is set at any given
240 : // point of time. If a snapshot is referenced, this is not a file-only
241 : // snapshot yet, and if a version is set (and ref'd) this is a file-only
242 : // snapshot.
243 :
244 : // The wrapped regular snapshot, if not a file-only snapshot yet.
245 : snap *Snapshot
246 : // The wrapped version reference, if a file-only snapshot.
247 : vers *version
248 : }
249 :
250 : // Key ranges to watch for an excise on.
251 : protectedRanges []KeyRange
252 :
253 : // The db the snapshot was created from.
254 : db *DB
255 : seqNum base.SeqNum
256 : closed chan struct{}
257 : }
258 :
259 2 : func (d *DB) makeEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
260 2 : isFileOnly := true
261 2 :
262 2 : d.mu.Lock()
263 2 : defer d.mu.Unlock()
264 2 : seqNum := d.mu.versions.visibleSeqNum.Load()
265 2 : // Check if any of the keyRanges overlap with a memtable.
266 2 : for i := range d.mu.mem.queue {
267 2 : d.mu.mem.queue[i].computePossibleOverlaps(func(bounded) shouldContinue {
268 2 : isFileOnly = false
269 2 : return stopIteration
270 2 : }, sliceAsBounded(keyRanges)...)
271 : }
272 2 : es := &EventuallyFileOnlySnapshot{
273 2 : db: d,
274 2 : seqNum: seqNum,
275 2 : protectedRanges: keyRanges,
276 2 : closed: make(chan struct{}),
277 2 : }
278 2 : if isFileOnly {
279 2 : es.mu.vers = d.mu.versions.currentVersion()
280 2 : es.mu.vers.Ref()
281 2 : } else {
282 2 : s := &Snapshot{
283 2 : db: d,
284 2 : seqNum: seqNum,
285 2 : }
286 2 : s.efos = es
287 2 : es.mu.snap = s
288 2 : d.mu.snapshots.pushBack(s)
289 2 : }
290 2 : return es
291 : }
292 :
293 : // Transitions this EventuallyFileOnlySnapshot to a file-only snapshot. Requires
294 : // earliestUnflushedSeqNum and vers to correspond to the same Version from the
295 : // current or a past acquisition of db.mu. vers must have been Ref()'d before
296 : // that mutex was released, if it was released.
297 : //
298 : // NB: The caller is expected to check for es.excised before making this
299 : // call.
300 : //
301 : // d.mu must be held when calling this method.
302 2 : func (es *EventuallyFileOnlySnapshot) transitionToFileOnlySnapshot(vers *version) error {
303 2 : es.mu.Lock()
304 2 : select {
305 1 : case <-es.closed:
306 1 : vers.UnrefLocked()
307 1 : es.mu.Unlock()
308 1 : return ErrClosed
309 2 : default:
310 : }
311 2 : if es.mu.snap == nil {
312 0 : es.mu.Unlock()
313 0 : panic("pebble: tried to transition an eventually-file-only-snapshot twice")
314 : }
315 : // The caller has already called Ref() on vers.
316 2 : es.mu.vers = vers
317 2 : // NB: The callers should have already done a check of es.excised.
318 2 : oldSnap := es.mu.snap
319 2 : es.mu.snap = nil
320 2 : es.mu.Unlock()
321 2 : return oldSnap.closeLocked()
322 : }
323 :
324 : // hasTransitioned returns true if this EFOS has transitioned to a file-only
325 : // snapshot.
326 2 : func (es *EventuallyFileOnlySnapshot) hasTransitioned() bool {
327 2 : es.mu.Lock()
328 2 : defer es.mu.Unlock()
329 2 : return es.mu.vers != nil
330 2 : }
331 :
332 : // waitForFlush waits for a flush on any memtables that need to be flushed
333 : // before this EFOS can transition to a file-only snapshot. If this EFOS is
334 : // waiting on a flush of the mutable memtable, it forces a rotation within
335 : // `dur` duration. For immutable memtables, it schedules a flush and waits for
336 : // it to finish.
337 1 : func (es *EventuallyFileOnlySnapshot) waitForFlush(ctx context.Context, dur time.Duration) error {
338 1 : es.db.mu.Lock()
339 1 : defer es.db.mu.Unlock()
340 1 :
341 1 : earliestUnflushedSeqNum := es.db.getEarliestUnflushedSeqNumLocked()
342 1 : for earliestUnflushedSeqNum < es.seqNum {
343 1 : select {
344 0 : case <-es.closed:
345 0 : return ErrClosed
346 0 : case <-ctx.Done():
347 0 : return ctx.Err()
348 1 : default:
349 : }
350 : // Check if the current mutable memtable contains keys less than seqNum.
351 : // If so, rotate it.
352 1 : if es.db.mu.mem.mutable.logSeqNum < es.seqNum && dur.Nanoseconds() > 0 {
353 1 : es.db.maybeScheduleDelayedFlush(es.db.mu.mem.mutable, dur)
354 1 : } else {
355 0 : // Find the last memtable that contains seqNums less than es.seqNum,
356 0 : // and force a flush on it.
357 0 : var mem *flushableEntry
358 0 : for i := range es.db.mu.mem.queue {
359 0 : if es.db.mu.mem.queue[i].logSeqNum < es.seqNum {
360 0 : mem = es.db.mu.mem.queue[i]
361 0 : }
362 : }
363 0 : mem.flushForced = true
364 0 : es.db.maybeScheduleFlush()
365 : }
366 1 : es.db.mu.compact.cond.Wait()
367 1 :
368 1 : earliestUnflushedSeqNum = es.db.getEarliestUnflushedSeqNumLocked()
369 : }
370 1 : return nil
371 : }
372 :
373 : // WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot
374 : // has been converted into a file-only snapshot (i.e. all memtables containing
375 : // keys < seqNum are flushed). A duration can be passed in, and if nonzero,
376 : // a delayed flush will be scheduled at that duration if necessary.
377 : //
378 : // Idempotent; can be called multiple times with no side effects.
379 : func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(
380 : ctx context.Context, dur time.Duration,
381 1 : ) error {
382 1 : if es.hasTransitioned() {
383 1 : return nil
384 1 : }
385 :
386 1 : if err := es.waitForFlush(ctx, dur); err != nil {
387 0 : return err
388 0 : }
389 :
390 1 : if invariants.Enabled {
391 1 : // Since we aren't returning an error, we _must_ have transitioned to a
392 1 : // file-only snapshot by now.
393 1 : if !es.hasTransitioned() {
394 0 : panic("expected EFOS to have transitioned to file-only snapshot after flush")
395 : }
396 : }
397 1 : return nil
398 : }
399 :
400 : // Close closes the file-only snapshot and releases all referenced resources.
401 : // Not idempotent.
402 2 : func (es *EventuallyFileOnlySnapshot) Close() error {
403 2 : close(es.closed)
404 2 : es.db.mu.Lock()
405 2 : defer es.db.mu.Unlock()
406 2 : es.mu.Lock()
407 2 : defer es.mu.Unlock()
408 2 :
409 2 : if es.mu.snap != nil {
410 2 : if err := es.mu.snap.closeLocked(); err != nil {
411 0 : return err
412 0 : }
413 : }
414 2 : if es.mu.vers != nil {
415 2 : es.mu.vers.UnrefLocked()
416 2 : }
417 2 : return nil
418 : }
419 :
420 : // Get implements the Reader interface.
421 2 : func (es *EventuallyFileOnlySnapshot) Get(key []byte) (value []byte, closer io.Closer, err error) {
422 2 : // TODO(jackson): Use getInternal.
423 2 : iter, err := es.NewIter(nil)
424 2 : if err != nil {
425 0 : return nil, nil, err
426 0 : }
427 2 : if !iter.SeekPrefixGE(key) {
428 2 : if err = firstError(iter.Error(), iter.Close()); err != nil {
429 1 : return nil, nil, err
430 1 : }
431 2 : return nil, nil, ErrNotFound
432 : }
433 1 : if !es.db.equal(iter.Key(), key) {
434 1 : return nil, nil, firstError(iter.Close(), ErrNotFound)
435 1 : }
436 1 : return iter.Value(), iter, nil
437 : }
438 :
439 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
440 : // return false). The iterator can be positioned via a call to SeekGE,
441 : // SeekLT, First or Last.
442 2 : func (es *EventuallyFileOnlySnapshot) NewIter(o *IterOptions) (*Iterator, error) {
443 2 : return es.NewIterWithContext(context.Background(), o)
444 2 : }
445 :
446 : // NewIterWithContext is like NewIter, and additionally accepts a context for
447 : // tracing.
448 : func (es *EventuallyFileOnlySnapshot) NewIterWithContext(
449 : ctx context.Context, o *IterOptions,
450 2 : ) (*Iterator, error) {
451 2 : select {
452 0 : case <-es.closed:
453 0 : panic(ErrClosed)
454 2 : default:
455 : }
456 :
457 2 : es.mu.Lock()
458 2 : defer es.mu.Unlock()
459 2 : if es.mu.vers != nil {
460 2 : sOpts := snapshotIterOpts{seqNum: es.seqNum, vers: es.mu.vers}
461 2 : return es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o), nil
462 2 : }
463 :
464 2 : sOpts := snapshotIterOpts{seqNum: es.seqNum}
465 2 : iter := es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o)
466 2 : return iter, nil
467 : }
468 :
469 : // ScanInternal scans all internal keys within the specified bounds, truncating
470 : // any rangedels and rangekeys to those bounds. For use when an external user
471 : // needs to be aware of all internal keys that make up a key range.
472 : //
473 : // See comment on db.ScanInternal for the behaviour that can be expected of
474 : // point keys deleted by range dels and keys masked by range keys.
475 : func (es *EventuallyFileOnlySnapshot) ScanInternal(
476 : ctx context.Context,
477 : category block.Category,
478 : lower, upper []byte,
479 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
480 : visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
481 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
482 : visitSharedFile func(sst *SharedSSTMeta) error,
483 : visitExternalFile func(sst *ExternalFile) error,
484 1 : ) error {
485 1 : if es.db == nil {
486 0 : panic(ErrClosed)
487 : }
488 1 : var sOpts snapshotIterOpts
489 1 : opts := &scanInternalOptions{
490 1 : category: category,
491 1 : IterOptions: IterOptions{
492 1 : KeyTypes: IterKeyTypePointsAndRanges,
493 1 : LowerBound: lower,
494 1 : UpperBound: upper,
495 1 : },
496 1 : visitPointKey: visitPointKey,
497 1 : visitRangeDel: visitRangeDel,
498 1 : visitRangeKey: visitRangeKey,
499 1 : visitSharedFile: visitSharedFile,
500 1 : visitExternalFile: visitExternalFile,
501 1 : }
502 1 : es.mu.Lock()
503 1 : if es.mu.vers != nil {
504 1 : sOpts = snapshotIterOpts{
505 1 : seqNum: es.seqNum,
506 1 : vers: es.mu.vers,
507 1 : }
508 1 : } else {
509 1 : sOpts = snapshotIterOpts{
510 1 : seqNum: es.seqNum,
511 1 : }
512 1 : }
513 1 : es.mu.Unlock()
514 1 : iter, err := es.db.newInternalIter(ctx, sOpts, opts)
515 1 : if err != nil {
516 0 : return err
517 0 : }
518 1 : defer iter.close()
519 1 :
520 1 : return scanInternalImpl(ctx, lower, upper, iter, opts)
521 : }
|