Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "io"
10 : "math"
11 : "sync"
12 : "time"
13 :
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/rangekey"
18 : "github.com/cockroachdb/pebble/sstable/block"
19 : )
20 :
21 : // Snapshot provides a read-only point-in-time view of the DB state.
22 : type Snapshot struct {
23 : // The db the snapshot was created from.
24 : db *DB
25 : seqNum base.SeqNum
26 :
27 : // Set if part of an EventuallyFileOnlySnapshot.
28 : efos *EventuallyFileOnlySnapshot
29 :
30 : // The list the snapshot is linked into.
31 : list *snapshotList
32 :
33 : // The next/prev link for the snapshotList doubly-linked list of snapshots.
34 : prev, next *Snapshot
35 : }
36 :
37 : var _ Reader = (*Snapshot)(nil)
38 :
39 : // Get gets the value for the given key. It returns ErrNotFound if the Snapshot
40 : // does not contain the key.
41 : //
42 : // The caller should not modify the contents of the returned slice, but it is
43 : // safe to modify the contents of the argument after Get returns. The returned
44 : // slice will remain valid until the returned Closer is closed. On success, the
45 : // caller MUST call closer.Close() or a memory leak will occur.
46 1 : func (s *Snapshot) Get(key []byte) ([]byte, io.Closer, error) {
47 1 : if s.db == nil {
48 0 : panic(ErrClosed)
49 : }
50 1 : return s.db.getInternal(key, nil /* batch */, s)
51 : }
52 :
53 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
54 : // return false). The iterator can be positioned via a call to SeekGE,
55 : // SeekLT, First or Last.
56 1 : func (s *Snapshot) NewIter(o *IterOptions) (*Iterator, error) {
57 1 : return s.NewIterWithContext(context.Background(), o)
58 1 : }
59 :
60 : // NewIterWithContext is like NewIter, and additionally accepts a context for
61 : // tracing.
62 1 : func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
63 1 : if s.db == nil {
64 0 : panic(ErrClosed)
65 : }
66 1 : return s.db.newIter(ctx, nil /* batch */, newIterOpts{
67 1 : snapshot: snapshotIterOpts{seqNum: s.seqNum},
68 1 : }, o), nil
69 : }
70 :
71 : // ScanInternal scans all internal keys within the specified bounds, truncating
72 : // any rangedels and rangekeys to those bounds. For use when an external user
73 : // needs to be aware of all internal keys that make up a key range.
74 : //
75 : // See comment on db.ScanInternal for the behaviour that can be expected of
76 : // point keys deleted by range dels and keys masked by range keys.
77 : func (s *Snapshot) ScanInternal(
78 : ctx context.Context,
79 : category block.Category,
80 : lower, upper []byte,
81 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
82 : visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
83 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
84 : visitSharedFile func(sst *SharedSSTMeta) error,
85 : visitExternalFile func(sst *ExternalFile) error,
86 0 : ) error {
87 0 : if s.db == nil {
88 0 : panic(ErrClosed)
89 : }
90 0 : scanInternalOpts := &scanInternalOptions{
91 0 : category: category,
92 0 : visitPointKey: visitPointKey,
93 0 : visitRangeDel: visitRangeDel,
94 0 : visitRangeKey: visitRangeKey,
95 0 : visitSharedFile: visitSharedFile,
96 0 : visitExternalFile: visitExternalFile,
97 0 : IterOptions: IterOptions{
98 0 : KeyTypes: IterKeyTypePointsAndRanges,
99 0 : LowerBound: lower,
100 0 : UpperBound: upper,
101 0 : },
102 0 : }
103 0 :
104 0 : iter, err := s.db.newInternalIter(ctx, snapshotIterOpts{seqNum: s.seqNum}, scanInternalOpts)
105 0 : if err != nil {
106 0 : return err
107 0 : }
108 0 : defer iter.close()
109 0 :
110 0 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
111 : }
112 :
113 : // closeLocked is similar to Close(), except it requires that db.mu be held
114 : // by the caller.
115 1 : func (s *Snapshot) closeLocked() error {
116 1 : s.db.mu.snapshots.remove(s)
117 1 :
118 1 : // If s was the previous earliest snapshot, we might be able to reclaim
119 1 : // disk space by dropping obsolete records that were pinned by s.
120 1 : if e := s.db.mu.snapshots.earliest(); e > s.seqNum {
121 1 : // NB: maybeScheduleCompaction also picks elision-only compactions.
122 1 : s.db.maybeScheduleCompaction()
123 1 : }
124 1 : s.db = nil
125 1 : return nil
126 : }
127 :
128 : // Close closes the snapshot, releasing its resources. Close must be called.
129 : // Failure to do so will result in a tiny memory leak and a large leak of
130 : // resources on disk due to the entries the snapshot is preventing from being
131 : // deleted.
132 : //
133 : // d.mu must NOT be held by the caller.
134 1 : func (s *Snapshot) Close() error {
135 1 : db := s.db
136 1 : if db == nil {
137 0 : panic(ErrClosed)
138 : }
139 1 : db.mu.Lock()
140 1 : defer db.mu.Unlock()
141 1 : return s.closeLocked()
142 : }
143 :
144 : type snapshotList struct {
145 : root Snapshot
146 : }
147 :
148 1 : func (l *snapshotList) init() {
149 1 : l.root.next = &l.root
150 1 : l.root.prev = &l.root
151 1 : }
152 :
153 1 : func (l *snapshotList) empty() bool {
154 1 : return l.root.next == &l.root
155 1 : }
156 :
157 1 : func (l *snapshotList) count() int {
158 1 : if l.empty() {
159 1 : return 0
160 1 : }
161 0 : var count int
162 0 : for i := l.root.next; i != &l.root; i = i.next {
163 0 : count++
164 0 : }
165 0 : return count
166 : }
167 :
168 1 : func (l *snapshotList) earliest() base.SeqNum {
169 1 : v := base.SeqNum(math.MaxUint64)
170 1 : if !l.empty() {
171 1 : v = l.root.next.seqNum
172 1 : }
173 1 : return v
174 : }
175 :
176 1 : func (l *snapshotList) toSlice() []base.SeqNum {
177 1 : if l.empty() {
178 1 : return nil
179 1 : }
180 1 : var results []base.SeqNum
181 1 : for i := l.root.next; i != &l.root; i = i.next {
182 1 : results = append(results, i.seqNum)
183 1 : }
184 1 : return results
185 : }
186 :
187 1 : func (l *snapshotList) pushBack(s *Snapshot) {
188 1 : if s.list != nil || s.prev != nil || s.next != nil {
189 0 : panic("pebble: snapshot list is inconsistent")
190 : }
191 1 : s.prev = l.root.prev
192 1 : s.prev.next = s
193 1 : s.next = &l.root
194 1 : s.next.prev = s
195 1 : s.list = l
196 : }
197 :
198 1 : func (l *snapshotList) remove(s *Snapshot) {
199 1 : if s == &l.root {
200 0 : panic("pebble: cannot remove snapshot list root node")
201 : }
202 1 : if s.list != l {
203 0 : panic("pebble: snapshot list is inconsistent")
204 : }
205 1 : s.prev.next = s.next
206 1 : s.next.prev = s.prev
207 1 : s.next = nil // avoid memory leaks
208 1 : s.prev = nil // avoid memory leaks
209 1 : s.list = nil // avoid memory leaks
210 : }
211 :
212 : // EventuallyFileOnlySnapshot (aka EFOS) provides a read-only point-in-time view
213 : // of the database state, similar to Snapshot. An EventuallyFileOnlySnapshot
214 : // induces less write amplification than Snapshot, at the cost of increased space
215 : // amplification. While a Snapshot may increase write amplification across all
216 : // flushes and compactions for the duration of its lifetime, an
217 : // EventuallyFileOnlySnapshot only incurs that cost for flushes/compactions if
218 : // memtables at the time of EFOS instantiation contained keys that the EFOS is
219 : // interested in (i.e. its protectedRanges). In that case, the EFOS prevents
220 : // elision of keys visible to it, similar to a Snapshot, until those memtables
221 : // are flushed, and once that happens, the "EventuallyFileOnlySnapshot"
222 : // transitions to a file-only snapshot state in which it pins zombies sstables
223 : // like an open Iterator would, without pinning any memtables. Callers that can
224 : // tolerate the increased space amplification of pinning zombie sstables until
225 : // the snapshot is closed may prefer EventuallyFileOnlySnapshots for their
226 : // reduced write amplification. Callers that desire the benefits of the file-only
227 : // state that requires no pinning of memtables should call
228 : // `WaitForFileOnlySnapshot()` before relying on the EFOS to keep producing iterators
229 : // with zero write-amp and zero pinning of memtables in memory.
230 : //
231 : // EventuallyFileOnlySnapshots interact with the IngestAndExcise operation in
232 : // subtle ways. The IngestAndExcise can force the transition of an EFOS to a
233 : // file-only snapshot if an excise overlaps with the EFOS bounds.
234 : type EventuallyFileOnlySnapshot struct {
235 : mu struct {
236 : // NB: If both this mutex and db.mu are being grabbed, db.mu should be
237 : // grabbed _before_ grabbing this one.
238 : sync.Mutex
239 :
240 : // Either the snap field is set below, or the version is set at any given
241 : // point of time. If a snapshot is referenced, this is not a file-only
242 : // snapshot yet, and if a version is set (and ref'd) this is a file-only
243 : // snapshot.
244 :
245 : // The wrapped regular snapshot, if not a file-only snapshot yet.
246 : snap *Snapshot
247 : // The wrapped version reference, if a file-only snapshot.
248 : vers *manifest.Version
249 : }
250 :
251 : // Key ranges to watch for an excise on.
252 : protectedRanges []KeyRange
253 :
254 : // The db the snapshot was created from.
255 : db *DB
256 : seqNum base.SeqNum
257 : closed chan struct{}
258 : }
259 :
260 1 : func (d *DB) makeEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
261 1 : isFileOnly := true
262 1 :
263 1 : d.mu.Lock()
264 1 : defer d.mu.Unlock()
265 1 : seqNum := d.mu.versions.visibleSeqNum.Load()
266 1 : // Check if any of the keyRanges overlap with a memtable.
267 1 : for i := range d.mu.mem.queue {
268 1 : d.mu.mem.queue[i].computePossibleOverlaps(func(bounded) shouldContinue {
269 1 : isFileOnly = false
270 1 : return stopIteration
271 1 : }, sliceAsBounded(keyRanges)...)
272 : }
273 1 : es := &EventuallyFileOnlySnapshot{
274 1 : db: d,
275 1 : seqNum: seqNum,
276 1 : protectedRanges: keyRanges,
277 1 : closed: make(chan struct{}),
278 1 : }
279 1 : if isFileOnly {
280 1 : es.mu.vers = d.mu.versions.currentVersion()
281 1 : es.mu.vers.Ref()
282 1 : } else {
283 1 : s := &Snapshot{
284 1 : db: d,
285 1 : seqNum: seqNum,
286 1 : }
287 1 : s.efos = es
288 1 : es.mu.snap = s
289 1 : d.mu.snapshots.pushBack(s)
290 1 : }
291 1 : return es
292 : }
293 :
294 : // Transitions this EventuallyFileOnlySnapshot to a file-only snapshot. Requires
295 : // earliestUnflushedSeqNum and vers to correspond to the same Version from the
296 : // current or a past acquisition of db.mu. vers must have been Ref()'d before
297 : // that mutex was released, if it was released.
298 : //
299 : // NB: The caller is expected to check for es.excised before making this
300 : // call.
301 : //
302 : // d.mu must be held when calling this method.
303 1 : func (es *EventuallyFileOnlySnapshot) transitionToFileOnlySnapshot(vers *manifest.Version) error {
304 1 : es.mu.Lock()
305 1 : select {
306 1 : case <-es.closed:
307 1 : vers.UnrefLocked()
308 1 : es.mu.Unlock()
309 1 : return ErrClosed
310 1 : default:
311 : }
312 1 : if es.mu.snap == nil {
313 0 : es.mu.Unlock()
314 0 : panic("pebble: tried to transition an eventually-file-only-snapshot twice")
315 : }
316 : // The caller has already called Ref() on vers.
317 1 : es.mu.vers = vers
318 1 : // NB: The callers should have already done a check of es.excised.
319 1 : oldSnap := es.mu.snap
320 1 : es.mu.snap = nil
321 1 : es.mu.Unlock()
322 1 : return oldSnap.closeLocked()
323 : }
324 :
325 : // hasTransitioned returns true if this EFOS has transitioned to a file-only
326 : // snapshot.
327 1 : func (es *EventuallyFileOnlySnapshot) hasTransitioned() bool {
328 1 : es.mu.Lock()
329 1 : defer es.mu.Unlock()
330 1 : return es.mu.vers != nil
331 1 : }
332 :
333 : // waitForFlush waits for a flush on any memtables that need to be flushed
334 : // before this EFOS can transition to a file-only snapshot. If this EFOS is
335 : // waiting on a flush of the mutable memtable, it forces a rotation within
336 : // `dur` duration. For immutable memtables, it schedules a flush and waits for
337 : // it to finish.
338 0 : func (es *EventuallyFileOnlySnapshot) waitForFlush(ctx context.Context, dur time.Duration) error {
339 0 : es.db.mu.Lock()
340 0 : defer es.db.mu.Unlock()
341 0 :
342 0 : earliestUnflushedSeqNum := es.db.getEarliestUnflushedSeqNumLocked()
343 0 : for earliestUnflushedSeqNum < es.seqNum {
344 0 : select {
345 0 : case <-es.closed:
346 0 : return ErrClosed
347 0 : case <-ctx.Done():
348 0 : return ctx.Err()
349 0 : default:
350 : }
351 : // Check if the current mutable memtable contains keys less than seqNum.
352 : // If so, rotate it.
353 0 : if es.db.mu.mem.mutable.logSeqNum < es.seqNum && dur.Nanoseconds() > 0 {
354 0 : es.db.maybeScheduleDelayedFlush(es.db.mu.mem.mutable, dur)
355 0 : } else {
356 0 : // Find the last memtable that contains seqNums less than es.seqNum,
357 0 : // and force a flush on it.
358 0 : var mem *flushableEntry
359 0 : for i := range es.db.mu.mem.queue {
360 0 : if es.db.mu.mem.queue[i].logSeqNum < es.seqNum {
361 0 : mem = es.db.mu.mem.queue[i]
362 0 : }
363 : }
364 0 : mem.flushForced = true
365 0 : es.db.maybeScheduleFlush()
366 : }
367 0 : es.db.mu.compact.cond.Wait()
368 0 :
369 0 : earliestUnflushedSeqNum = es.db.getEarliestUnflushedSeqNumLocked()
370 : }
371 0 : return nil
372 : }
373 :
374 : // WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot
375 : // has been converted into a file-only snapshot (i.e. all memtables containing
376 : // keys < seqNum are flushed). A duration can be passed in, and if nonzero,
377 : // a delayed flush will be scheduled at that duration if necessary.
378 : //
379 : // Idempotent; can be called multiple times with no side effects.
380 : func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(
381 : ctx context.Context, dur time.Duration,
382 0 : ) error {
383 0 : if es.hasTransitioned() {
384 0 : return nil
385 0 : }
386 :
387 0 : if err := es.waitForFlush(ctx, dur); err != nil {
388 0 : return err
389 0 : }
390 :
391 0 : if invariants.Enabled {
392 0 : // Since we aren't returning an error, we _must_ have transitioned to a
393 0 : // file-only snapshot by now.
394 0 : if !es.hasTransitioned() {
395 0 : panic("expected EFOS to have transitioned to file-only snapshot after flush")
396 : }
397 : }
398 0 : return nil
399 : }
400 :
401 : // Close closes the file-only snapshot and releases all referenced resources.
402 : // Not idempotent.
403 1 : func (es *EventuallyFileOnlySnapshot) Close() error {
404 1 : close(es.closed)
405 1 : es.db.mu.Lock()
406 1 : defer es.db.mu.Unlock()
407 1 : es.mu.Lock()
408 1 : defer es.mu.Unlock()
409 1 :
410 1 : if es.mu.snap != nil {
411 1 : if err := es.mu.snap.closeLocked(); err != nil {
412 0 : return err
413 0 : }
414 : }
415 1 : if es.mu.vers != nil {
416 1 : es.mu.vers.UnrefLocked()
417 1 : }
418 1 : return nil
419 : }
420 :
421 : // Get implements the Reader interface.
422 1 : func (es *EventuallyFileOnlySnapshot) Get(key []byte) (value []byte, closer io.Closer, err error) {
423 1 : // TODO(jackson): Use getInternal.
424 1 : iter, err := es.NewIter(nil)
425 1 : if err != nil {
426 0 : return nil, nil, err
427 0 : }
428 1 : if !iter.SeekPrefixGE(key) {
429 1 : if err = firstError(iter.Error(), iter.Close()); err != nil {
430 0 : return nil, nil, err
431 0 : }
432 1 : return nil, nil, ErrNotFound
433 : }
434 1 : if !es.db.equal(iter.Key(), key) {
435 1 : return nil, nil, firstError(iter.Close(), ErrNotFound)
436 1 : }
437 1 : return iter.Value(), iter, nil
438 : }
439 :
440 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
441 : // return false). The iterator can be positioned via a call to SeekGE,
442 : // SeekLT, First or Last.
443 1 : func (es *EventuallyFileOnlySnapshot) NewIter(o *IterOptions) (*Iterator, error) {
444 1 : return es.NewIterWithContext(context.Background(), o)
445 1 : }
446 :
447 : // NewIterWithContext is like NewIter, and additionally accepts a context for
448 : // tracing.
449 : func (es *EventuallyFileOnlySnapshot) NewIterWithContext(
450 : ctx context.Context, o *IterOptions,
451 1 : ) (*Iterator, error) {
452 1 : select {
453 0 : case <-es.closed:
454 0 : panic(ErrClosed)
455 1 : default:
456 : }
457 :
458 1 : es.mu.Lock()
459 1 : defer es.mu.Unlock()
460 1 : if es.mu.vers != nil {
461 1 : sOpts := snapshotIterOpts{seqNum: es.seqNum, vers: es.mu.vers}
462 1 : return es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o), nil
463 1 : }
464 :
465 1 : sOpts := snapshotIterOpts{seqNum: es.seqNum}
466 1 : iter := es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o)
467 1 : return iter, nil
468 : }
469 :
470 : // ScanInternal scans all internal keys within the specified bounds, truncating
471 : // any rangedels and rangekeys to those bounds. For use when an external user
472 : // needs to be aware of all internal keys that make up a key range.
473 : //
474 : // See comment on db.ScanInternal for the behaviour that can be expected of
475 : // point keys deleted by range dels and keys masked by range keys.
476 : func (es *EventuallyFileOnlySnapshot) ScanInternal(
477 : ctx context.Context,
478 : category block.Category,
479 : lower, upper []byte,
480 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
481 : visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
482 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
483 : visitSharedFile func(sst *SharedSSTMeta) error,
484 : visitExternalFile func(sst *ExternalFile) error,
485 0 : ) error {
486 0 : if es.db == nil {
487 0 : panic(ErrClosed)
488 : }
489 0 : var sOpts snapshotIterOpts
490 0 : opts := &scanInternalOptions{
491 0 : category: category,
492 0 : IterOptions: IterOptions{
493 0 : KeyTypes: IterKeyTypePointsAndRanges,
494 0 : LowerBound: lower,
495 0 : UpperBound: upper,
496 0 : },
497 0 : visitPointKey: visitPointKey,
498 0 : visitRangeDel: visitRangeDel,
499 0 : visitRangeKey: visitRangeKey,
500 0 : visitSharedFile: visitSharedFile,
501 0 : visitExternalFile: visitExternalFile,
502 0 : }
503 0 : es.mu.Lock()
504 0 : if es.mu.vers != nil {
505 0 : sOpts = snapshotIterOpts{
506 0 : seqNum: es.seqNum,
507 0 : vers: es.mu.vers,
508 0 : }
509 0 : } else {
510 0 : sOpts = snapshotIterOpts{
511 0 : seqNum: es.seqNum,
512 0 : }
513 0 : }
514 0 : es.mu.Unlock()
515 0 : iter, err := es.db.newInternalIter(ctx, sOpts, opts)
516 0 : if err != nil {
517 0 : return err
518 0 : }
519 0 : defer iter.close()
520 0 :
521 0 : return scanInternalImpl(ctx, lower, upper, iter, opts)
522 : }
|