Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "io"
10 : "math"
11 : "sync"
12 : "sync/atomic"
13 : "time"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/rangekey"
18 : "github.com/cockroachdb/pebble/sstable"
19 : )
20 :
21 : // ErrSnapshotExcised is returned from WaitForFileOnlySnapshot if an excise
22 : // overlapping with one of the EventuallyFileOnlySnapshot's KeyRanges gets
23 : // applied before the transition of that EFOS to a file-only snapshot.
24 : var ErrSnapshotExcised = errors.New("pebble: snapshot excised before conversion to file-only snapshot")
25 :
26 : // Snapshot provides a read-only point-in-time view of the DB state.
27 : type Snapshot struct {
28 : // The db the snapshot was created from.
29 : db *DB
30 : seqNum uint64
31 :
32 : // Set if part of an EventuallyFileOnlySnapshot.
33 : efos *EventuallyFileOnlySnapshot
34 :
35 : // The list the snapshot is linked into.
36 : list *snapshotList
37 :
38 : // The next/prev link for the snapshotList doubly-linked list of snapshots.
39 : prev, next *Snapshot
40 : }
41 :
42 : var _ Reader = (*Snapshot)(nil)
43 :
44 : // Get gets the value for the given key. It returns ErrNotFound if the Snapshot
45 : // does not contain the key.
46 : //
47 : // The caller should not modify the contents of the returned slice, but it is
48 : // safe to modify the contents of the argument after Get returns. The returned
49 : // slice will remain valid until the returned Closer is closed. On success, the
50 : // caller MUST call closer.Close() or a memory leak will occur.
51 2 : func (s *Snapshot) Get(key []byte) ([]byte, io.Closer, error) {
52 2 : if s.db == nil {
53 1 : panic(ErrClosed)
54 : }
55 2 : return s.db.getInternal(key, nil /* batch */, s)
56 : }
57 :
58 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
59 : // return false). The iterator can be positioned via a call to SeekGE,
60 : // SeekLT, First or Last.
61 2 : func (s *Snapshot) NewIter(o *IterOptions) (*Iterator, error) {
62 2 : return s.NewIterWithContext(context.Background(), o)
63 2 : }
64 :
65 : // NewIterWithContext is like NewIter, and additionally accepts a context for
66 : // tracing.
67 2 : func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
68 2 : if s.db == nil {
69 1 : panic(ErrClosed)
70 : }
71 2 : return s.db.newIter(ctx, nil /* batch */, newIterOpts{
72 2 : snapshot: snapshotIterOpts{seqNum: s.seqNum},
73 2 : }, o), nil
74 : }
75 :
76 : // ScanInternal scans all internal keys within the specified bounds, truncating
77 : // any rangedels and rangekeys to those bounds. For use when an external user
78 : // needs to be aware of all internal keys that make up a key range.
79 : //
80 : // See comment on db.ScanInternal for the behaviour that can be expected of
81 : // point keys deleted by range dels and keys masked by range keys.
82 : func (s *Snapshot) ScanInternal(
83 : ctx context.Context,
84 : categoryAndQoS sstable.CategoryAndQoS,
85 : lower, upper []byte,
86 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
87 : visitRangeDel func(start, end []byte, seqNum uint64) error,
88 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
89 : visitSharedFile func(sst *SharedSSTMeta) error,
90 1 : ) error {
91 1 : if s.db == nil {
92 0 : panic(ErrClosed)
93 : }
94 1 : scanInternalOpts := &scanInternalOptions{
95 1 : CategoryAndQoS: categoryAndQoS,
96 1 : visitPointKey: visitPointKey,
97 1 : visitRangeDel: visitRangeDel,
98 1 : visitRangeKey: visitRangeKey,
99 1 : visitSharedFile: visitSharedFile,
100 1 : skipSharedLevels: visitSharedFile != nil,
101 1 : IterOptions: IterOptions{
102 1 : KeyTypes: IterKeyTypePointsAndRanges,
103 1 : LowerBound: lower,
104 1 : UpperBound: upper,
105 1 : },
106 1 : }
107 1 :
108 1 : iter := s.db.newInternalIter(ctx, snapshotIterOpts{seqNum: s.seqNum}, scanInternalOpts)
109 1 : defer iter.close()
110 1 :
111 1 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
112 : }
113 :
114 : // closeLocked is similar to Close(), except it requires that db.mu be held
115 : // by the caller.
116 2 : func (s *Snapshot) closeLocked() error {
117 2 : s.db.mu.snapshots.remove(s)
118 2 :
119 2 : // If s was the previous earliest snapshot, we might be able to reclaim
120 2 : // disk space by dropping obsolete records that were pinned by s.
121 2 : if e := s.db.mu.snapshots.earliest(); e > s.seqNum {
122 2 : s.db.maybeScheduleCompactionPicker(pickElisionOnly)
123 2 : }
124 2 : s.db = nil
125 2 : return nil
126 : }
127 :
128 : // Close closes the snapshot, releasing its resources. Close must be called.
129 : // Failure to do so will result in a tiny memory leak and a large leak of
130 : // resources on disk due to the entries the snapshot is preventing from being
131 : // deleted.
132 : //
133 : // d.mu must NOT be held by the caller.
134 2 : func (s *Snapshot) Close() error {
135 2 : db := s.db
136 2 : if db == nil {
137 1 : panic(ErrClosed)
138 : }
139 2 : db.mu.Lock()
140 2 : defer db.mu.Unlock()
141 2 : return s.closeLocked()
142 : }
143 :
144 : type snapshotList struct {
145 : root Snapshot
146 : }
147 :
148 2 : func (l *snapshotList) init() {
149 2 : l.root.next = &l.root
150 2 : l.root.prev = &l.root
151 2 : }
152 :
153 2 : func (l *snapshotList) empty() bool {
154 2 : return l.root.next == &l.root
155 2 : }
156 :
157 2 : func (l *snapshotList) count() int {
158 2 : if l.empty() {
159 2 : return 0
160 2 : }
161 1 : var count int
162 1 : for i := l.root.next; i != &l.root; i = i.next {
163 1 : count++
164 1 : }
165 1 : return count
166 : }
167 :
168 2 : func (l *snapshotList) earliest() uint64 {
169 2 : v := uint64(math.MaxUint64)
170 2 : if !l.empty() {
171 2 : v = l.root.next.seqNum
172 2 : }
173 2 : return v
174 : }
175 :
176 2 : func (l *snapshotList) toSlice() []uint64 {
177 2 : if l.empty() {
178 2 : return nil
179 2 : }
180 2 : var results []uint64
181 2 : for i := l.root.next; i != &l.root; i = i.next {
182 2 : results = append(results, i.seqNum)
183 2 : }
184 2 : return results
185 : }
186 :
187 2 : func (l *snapshotList) pushBack(s *Snapshot) {
188 2 : if s.list != nil || s.prev != nil || s.next != nil {
189 0 : panic("pebble: snapshot list is inconsistent")
190 : }
191 2 : s.prev = l.root.prev
192 2 : s.prev.next = s
193 2 : s.next = &l.root
194 2 : s.next.prev = s
195 2 : s.list = l
196 : }
197 :
198 2 : func (l *snapshotList) remove(s *Snapshot) {
199 2 : if s == &l.root {
200 0 : panic("pebble: cannot remove snapshot list root node")
201 : }
202 2 : if s.list != l {
203 0 : panic("pebble: snapshot list is inconsistent")
204 : }
205 2 : s.prev.next = s.next
206 2 : s.next.prev = s.prev
207 2 : s.next = nil // avoid memory leaks
208 2 : s.prev = nil // avoid memory leaks
209 2 : s.list = nil // avoid memory leaks
210 : }
211 :
212 : // EventuallyFileOnlySnapshot (aka EFOS) provides a read-only point-in-time view
213 : // of the database state, similar to Snapshot. An EventuallyFileOnlySnapshot
214 : // induces less write amplification than Snapshot, at the cost of increased space
215 : // amplification. While a Snapshot may increase write amplification across all
216 : // flushes and compactions for the duration of its lifetime, an
217 : // EventuallyFileOnlySnapshot only incurs that cost for flushes/compactions if
218 : // memtables at the time of EFOS instantiation contained keys that the EFOS is
219 : // interested in (i.e. its protectedRanges). In that case, the EFOS prevents
220 : // elision of keys visible to it, similar to a Snapshot, until those memtables
221 : // are flushed, and once that happens, the "EventuallyFileOnlySnapshot"
222 : // transitions to a file-only snapshot state in which it pins zombies sstables
223 : // like an open Iterator would, without pinning any memtables. Callers that can
224 : // tolerate the increased space amplification of pinning zombie sstables until
225 : // the snapshot is closed may prefer EventuallyFileOnlySnapshots for their
226 : // reduced write amplification. Callers that desire the benefits of the file-only
227 : // state that requires no pinning of memtables should call
228 : // `WaitForFileOnlySnapshot()` (and possibly re-mint an EFOS if it returns
229 : // ErrSnapshotExcised) before relying on the EFOS to keep producing iterators
230 : // with zero write-amp and zero pinning of memtables in memory.
231 : //
232 : // EventuallyFileOnlySnapshots interact with the IngestAndExcise operation in
233 : // subtle ways. No new iterators can be created once
234 : // EventuallyFileOnlySnapshot.excised is set to true.
235 : type EventuallyFileOnlySnapshot struct {
236 : mu struct {
237 : // NB: If both this mutex and db.mu are being grabbed, db.mu should be
238 : // grabbed _before_ grabbing this one.
239 : sync.Mutex
240 :
241 : // Either the snap field is set below, or the version is set at any given
242 : // point of time. If a snapshot is referenced, this is not a file-only
243 : // snapshot yet, and if a version is set (and ref'd) this is a file-only
244 : // snapshot.
245 :
246 : // The wrapped regular snapshot, if not a file-only snapshot yet.
247 : snap *Snapshot
248 : // The wrapped version reference, if a file-only snapshot.
249 : vers *version
250 : }
251 :
252 : // Key ranges to watch for an excise on.
253 : protectedRanges []KeyRange
254 : // excised, if true, signals that the above ranges were excised during the
255 : // lifetime of this snapshot.
256 : excised atomic.Bool
257 :
258 : // The db the snapshot was created from.
259 : db *DB
260 : seqNum uint64
261 :
262 : closed chan struct{}
263 : }
264 :
265 : func (d *DB) makeEventuallyFileOnlySnapshot(
266 : keyRanges []KeyRange, internalKeyRanges []internalKeyRange,
267 2 : ) *EventuallyFileOnlySnapshot {
268 2 : isFileOnly := true
269 2 :
270 2 : d.mu.Lock()
271 2 : defer d.mu.Unlock()
272 2 : seqNum := d.mu.versions.visibleSeqNum.Load()
273 2 : // Check if any of the keyRanges overlap with a memtable.
274 2 : for i := range d.mu.mem.queue {
275 2 : mem := d.mu.mem.queue[i]
276 2 : if ingestMemtableOverlaps(d.cmp, mem, internalKeyRanges) {
277 2 : isFileOnly = false
278 2 : break
279 : }
280 : }
281 2 : es := &EventuallyFileOnlySnapshot{
282 2 : db: d,
283 2 : seqNum: seqNum,
284 2 : protectedRanges: keyRanges,
285 2 : closed: make(chan struct{}),
286 2 : }
287 2 : if isFileOnly {
288 2 : es.mu.vers = d.mu.versions.currentVersion()
289 2 : es.mu.vers.Ref()
290 2 : } else {
291 2 : s := &Snapshot{
292 2 : db: d,
293 2 : seqNum: seqNum,
294 2 : }
295 2 : s.efos = es
296 2 : es.mu.snap = s
297 2 : d.mu.snapshots.pushBack(s)
298 2 : }
299 2 : return es
300 : }
301 :
302 : // Transitions this EventuallyFileOnlySnapshot to a file-only snapshot. Requires
303 : // earliestUnflushedSeqNum and vers to correspond to the same Version from the
304 : // current or a past acquisition of db.mu. vers must have been Ref()'d before
305 : // that mutex was released, if it was released.
306 : //
307 : // NB: The caller is expected to check for es.excised before making this
308 : // call.
309 : //
310 : // d.mu must be held when calling this method.
311 2 : func (es *EventuallyFileOnlySnapshot) transitionToFileOnlySnapshot(vers *version) error {
312 2 : es.mu.Lock()
313 2 : select {
314 1 : case <-es.closed:
315 1 : vers.UnrefLocked()
316 1 : es.mu.Unlock()
317 1 : return ErrClosed
318 2 : default:
319 : }
320 2 : if es.mu.snap == nil {
321 0 : es.mu.Unlock()
322 0 : panic("pebble: tried to transition an eventually-file-only-snapshot twice")
323 : }
324 : // The caller has already called Ref() on vers.
325 2 : es.mu.vers = vers
326 2 : // NB: The callers should have already done a check of es.excised.
327 2 : oldSnap := es.mu.snap
328 2 : es.mu.snap = nil
329 2 : es.mu.Unlock()
330 2 : return oldSnap.closeLocked()
331 : }
332 :
333 : // hasTransitioned returns true if this EFOS has transitioned to a file-only
334 : // snapshot.
335 1 : func (es *EventuallyFileOnlySnapshot) hasTransitioned() bool {
336 1 : es.mu.Lock()
337 1 : defer es.mu.Unlock()
338 1 : return es.mu.vers != nil
339 1 : }
340 :
341 : // waitForFlush waits for a flush on any memtables that need to be flushed
342 : // before this EFOS can transition to a file-only snapshot. If this EFOS is
343 : // waiting on a flush of the mutable memtable, it forces a rotation within
344 : // `dur` duration. For immutable memtables, it schedules a flush and waits for
345 : // it to finish.
346 1 : func (es *EventuallyFileOnlySnapshot) waitForFlush(ctx context.Context, dur time.Duration) error {
347 1 : es.db.mu.Lock()
348 1 : defer es.db.mu.Unlock()
349 1 :
350 1 : earliestUnflushedSeqNum := es.db.getEarliestUnflushedSeqNumLocked()
351 1 : for earliestUnflushedSeqNum < es.seqNum {
352 1 : select {
353 0 : case <-es.closed:
354 0 : return ErrClosed
355 0 : case <-ctx.Done():
356 0 : return ctx.Err()
357 1 : default:
358 : }
359 : // Check if the current mutable memtable contains keys less than seqNum.
360 : // If so, rotate it.
361 1 : if es.db.mu.mem.mutable.logSeqNum < es.seqNum && dur.Nanoseconds() > 0 {
362 1 : es.db.maybeScheduleDelayedFlush(es.db.mu.mem.mutable, dur)
363 1 : } else {
364 0 : // Find the last memtable that contains seqNums less than es.seqNum,
365 0 : // and force a flush on it.
366 0 : var mem *flushableEntry
367 0 : for i := range es.db.mu.mem.queue {
368 0 : if es.db.mu.mem.queue[i].logSeqNum < es.seqNum {
369 0 : mem = es.db.mu.mem.queue[i]
370 0 : }
371 : }
372 0 : mem.flushForced = true
373 0 : es.db.maybeScheduleFlush()
374 : }
375 1 : es.db.mu.compact.cond.Wait()
376 1 :
377 1 : earliestUnflushedSeqNum = es.db.getEarliestUnflushedSeqNumLocked()
378 : }
379 1 : if es.excised.Load() {
380 1 : return ErrSnapshotExcised
381 1 : }
382 1 : return nil
383 : }
384 :
385 : // WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot
386 : // has been converted into a file-only snapshot (i.e. all memtables containing
387 : // keys < seqNum are flushed). A duration can be passed in, and if nonzero,
388 : // a delayed flush will be scheduled at that duration if necessary.
389 : //
390 : // Idempotent; can be called multiple times with no side effects.
391 : func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(
392 : ctx context.Context, dur time.Duration,
393 1 : ) error {
394 1 : if es.hasTransitioned() {
395 1 : return nil
396 1 : }
397 :
398 1 : if err := es.waitForFlush(ctx, dur); err != nil {
399 1 : return err
400 1 : }
401 :
402 1 : if invariants.Enabled {
403 1 : // Since we aren't returning an error, we _must_ have transitioned to a
404 1 : // file-only snapshot by now.
405 1 : if !es.hasTransitioned() {
406 0 : panic("expected EFOS to have transitioned to file-only snapshot after flush")
407 : }
408 : }
409 1 : return nil
410 : }
411 :
412 : // Close closes the file-only snapshot and releases all referenced resources.
413 : // Not idempotent.
414 2 : func (es *EventuallyFileOnlySnapshot) Close() error {
415 2 : close(es.closed)
416 2 : es.db.mu.Lock()
417 2 : defer es.db.mu.Unlock()
418 2 : es.mu.Lock()
419 2 : defer es.mu.Unlock()
420 2 :
421 2 : if es.mu.snap != nil {
422 2 : if err := es.mu.snap.closeLocked(); err != nil {
423 0 : return err
424 0 : }
425 : }
426 2 : if es.mu.vers != nil {
427 2 : es.mu.vers.UnrefLocked()
428 2 : }
429 2 : return nil
430 : }
431 :
432 : // Get implements the Reader interface.
433 1 : func (es *EventuallyFileOnlySnapshot) Get(key []byte) (value []byte, closer io.Closer, err error) {
434 1 : // TODO(jackson): Use getInternal.
435 1 : iter, err := es.NewIter(nil)
436 1 : if err != nil {
437 0 : return nil, nil, err
438 0 : }
439 1 : var valid bool
440 1 : if es.db.opts.Comparer.Split != nil {
441 1 : valid = iter.SeekPrefixGE(key)
442 1 : } else {
443 0 : valid = iter.SeekGE(key)
444 0 : }
445 1 : if !valid {
446 1 : if err = firstError(iter.Error(), iter.Close()); err != nil {
447 0 : return nil, nil, err
448 0 : }
449 1 : return nil, nil, ErrNotFound
450 : }
451 1 : if !es.db.equal(iter.Key(), key) {
452 1 : return nil, nil, firstError(iter.Close(), ErrNotFound)
453 1 : }
454 1 : return iter.Value(), iter, nil
455 : }
456 :
457 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
458 : // return false). The iterator can be positioned via a call to SeekGE,
459 : // SeekLT, First or Last.
460 2 : func (es *EventuallyFileOnlySnapshot) NewIter(o *IterOptions) (*Iterator, error) {
461 2 : return es.NewIterWithContext(context.Background(), o)
462 2 : }
463 :
464 : // NewIterWithContext is like NewIter, and additionally accepts a context for
465 : // tracing.
466 : func (es *EventuallyFileOnlySnapshot) NewIterWithContext(
467 : ctx context.Context, o *IterOptions,
468 2 : ) (*Iterator, error) {
469 2 : select {
470 0 : case <-es.closed:
471 0 : panic(ErrClosed)
472 2 : default:
473 : }
474 :
475 2 : es.mu.Lock()
476 2 : defer es.mu.Unlock()
477 2 : if es.mu.vers != nil {
478 2 : sOpts := snapshotIterOpts{seqNum: es.seqNum, vers: es.mu.vers}
479 2 : return es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o), nil
480 2 : }
481 :
482 2 : if es.excised.Load() {
483 1 : return nil, ErrSnapshotExcised
484 1 : }
485 2 : sOpts := snapshotIterOpts{seqNum: es.seqNum}
486 2 : iter := es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o)
487 2 :
488 2 : // If excised is true, then keys relevant to the snapshot might not be
489 2 : // present in the readState being used by the iterator. Error out.
490 2 : if es.excised.Load() {
491 0 : iter.Close()
492 0 : return nil, ErrSnapshotExcised
493 0 : }
494 2 : return iter, nil
495 : }
496 :
497 : // ScanInternal scans all internal keys within the specified bounds, truncating
498 : // any rangedels and rangekeys to those bounds. For use when an external user
499 : // needs to be aware of all internal keys that make up a key range.
500 : //
501 : // See comment on db.ScanInternal for the behaviour that can be expected of
502 : // point keys deleted by range dels and keys masked by range keys.
503 : func (es *EventuallyFileOnlySnapshot) ScanInternal(
504 : ctx context.Context,
505 : categoryAndQoS sstable.CategoryAndQoS,
506 : lower, upper []byte,
507 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
508 : visitRangeDel func(start, end []byte, seqNum uint64) error,
509 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
510 : visitSharedFile func(sst *SharedSSTMeta) error,
511 1 : ) error {
512 1 : if es.db == nil {
513 0 : panic(ErrClosed)
514 : }
515 1 : if es.excised.Load() {
516 0 : return ErrSnapshotExcised
517 0 : }
518 1 : var sOpts snapshotIterOpts
519 1 : es.mu.Lock()
520 1 : if es.mu.vers != nil {
521 1 : sOpts = snapshotIterOpts{
522 1 : seqNum: es.seqNum,
523 1 : vers: es.mu.vers,
524 1 : }
525 1 : } else {
526 1 : sOpts = snapshotIterOpts{
527 1 : seqNum: es.seqNum,
528 1 : }
529 1 : }
530 1 : es.mu.Unlock()
531 1 : opts := &scanInternalOptions{
532 1 : CategoryAndQoS: categoryAndQoS,
533 1 : IterOptions: IterOptions{
534 1 : KeyTypes: IterKeyTypePointsAndRanges,
535 1 : LowerBound: lower,
536 1 : UpperBound: upper,
537 1 : },
538 1 : visitPointKey: visitPointKey,
539 1 : visitRangeDel: visitRangeDel,
540 1 : visitRangeKey: visitRangeKey,
541 1 : visitSharedFile: visitSharedFile,
542 1 : skipSharedLevels: visitSharedFile != nil,
543 1 : }
544 1 : iter := es.db.newInternalIter(ctx, sOpts, opts)
545 1 : defer iter.close()
546 1 :
547 1 : // If excised is true, then keys relevant to the snapshot might not be
548 1 : // present in the readState being used by the iterator. Error out.
549 1 : if es.excised.Load() {
550 0 : return ErrSnapshotExcised
551 0 : }
552 :
553 1 : return scanInternalImpl(ctx, lower, upper, iter, opts)
554 : }
|