Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "io"
10 : "math"
11 : "sync"
12 : "sync/atomic"
13 : "time"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/rangekey"
18 : )
19 :
20 : // ErrSnapshotExcised is returned from WaitForFileOnlySnapshot if an excise
21 : // overlapping with one of the EventuallyFileOnlySnapshot's KeyRanges gets
22 : // applied before the transition of that EFOS to a file-only snapshot.
23 : var ErrSnapshotExcised = errors.New("pebble: snapshot excised before conversion to file-only snapshot")
24 :
25 : // Snapshot provides a read-only point-in-time view of the DB state.
26 : type Snapshot struct {
27 : // The db the snapshot was created from.
28 : db *DB
29 : seqNum uint64
30 :
31 : // Set if part of an EventuallyFileOnlySnapshot.
32 : efos *EventuallyFileOnlySnapshot
33 :
34 : // The list the snapshot is linked into.
35 : list *snapshotList
36 :
37 : // The next/prev link for the snapshotList doubly-linked list of snapshots.
38 : prev, next *Snapshot
39 : }
40 :
41 : var _ Reader = (*Snapshot)(nil)
42 :
43 : // Get gets the value for the given key. It returns ErrNotFound if the Snapshot
44 : // does not contain the key.
45 : //
46 : // The caller should not modify the contents of the returned slice, but it is
47 : // safe to modify the contents of the argument after Get returns. The returned
48 : // slice will remain valid until the returned Closer is closed. On success, the
49 : // caller MUST call closer.Close() or a memory leak will occur.
50 1 : func (s *Snapshot) Get(key []byte) ([]byte, io.Closer, error) {
51 1 : if s.db == nil {
52 1 : panic(ErrClosed)
53 : }
54 1 : return s.db.getInternal(key, nil /* batch */, s)
55 : }
56 :
57 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
58 : // return false). The iterator can be positioned via a call to SeekGE,
59 : // SeekLT, First or Last.
60 1 : func (s *Snapshot) NewIter(o *IterOptions) (*Iterator, error) {
61 1 : return s.NewIterWithContext(context.Background(), o)
62 1 : }
63 :
64 : // NewIterWithContext is like NewIter, and additionally accepts a context for
65 : // tracing.
66 1 : func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
67 1 : if s.db == nil {
68 1 : panic(ErrClosed)
69 : }
70 1 : return s.db.newIter(ctx, nil /* batch */, snapshotIterOpts{seqNum: s.seqNum}, o), nil
71 : }
72 :
73 : // ScanInternal scans all internal keys within the specified bounds, truncating
74 : // any rangedels and rangekeys to those bounds. For use when an external user
75 : // needs to be aware of all internal keys that make up a key range.
76 : //
77 : // See comment on db.ScanInternal for the behaviour that can be expected of
78 : // point keys deleted by range dels and keys masked by range keys.
79 : func (s *Snapshot) ScanInternal(
80 : ctx context.Context,
81 : lower, upper []byte,
82 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
83 : visitRangeDel func(start, end []byte, seqNum uint64) error,
84 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
85 : visitSharedFile func(sst *SharedSSTMeta) error,
86 1 : ) error {
87 1 : if s.db == nil {
88 0 : panic(ErrClosed)
89 : }
90 1 : scanInternalOpts := &scanInternalOptions{
91 1 : visitPointKey: visitPointKey,
92 1 : visitRangeDel: visitRangeDel,
93 1 : visitRangeKey: visitRangeKey,
94 1 : visitSharedFile: visitSharedFile,
95 1 : skipSharedLevels: visitSharedFile != nil,
96 1 : IterOptions: IterOptions{
97 1 : KeyTypes: IterKeyTypePointsAndRanges,
98 1 : LowerBound: lower,
99 1 : UpperBound: upper,
100 1 : },
101 1 : }
102 1 :
103 1 : iter := s.db.newInternalIter(snapshotIterOpts{seqNum: s.seqNum}, scanInternalOpts)
104 1 : defer iter.close()
105 1 :
106 1 : return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
107 : }
108 :
109 : // closeLocked is similar to Close(), except it requires that db.mu be held
110 : // by the caller.
111 1 : func (s *Snapshot) closeLocked() error {
112 1 : s.db.mu.snapshots.remove(s)
113 1 :
114 1 : // If s was the previous earliest snapshot, we might be able to reclaim
115 1 : // disk space by dropping obsolete records that were pinned by s.
116 1 : if e := s.db.mu.snapshots.earliest(); e > s.seqNum {
117 1 : s.db.maybeScheduleCompactionPicker(pickElisionOnly)
118 1 : }
119 1 : s.db = nil
120 1 : return nil
121 : }
122 :
123 : // Close closes the snapshot, releasing its resources. Close must be called.
124 : // Failure to do so will result in a tiny memory leak and a large leak of
125 : // resources on disk due to the entries the snapshot is preventing from being
126 : // deleted.
127 : //
128 : // d.mu must NOT be held by the caller.
129 1 : func (s *Snapshot) Close() error {
130 1 : db := s.db
131 1 : if db == nil {
132 1 : panic(ErrClosed)
133 : }
134 1 : db.mu.Lock()
135 1 : defer db.mu.Unlock()
136 1 : return s.closeLocked()
137 : }
138 :
139 : type snapshotList struct {
140 : root Snapshot
141 : }
142 :
143 1 : func (l *snapshotList) init() {
144 1 : l.root.next = &l.root
145 1 : l.root.prev = &l.root
146 1 : }
147 :
148 1 : func (l *snapshotList) empty() bool {
149 1 : return l.root.next == &l.root
150 1 : }
151 :
152 1 : func (l *snapshotList) count() int {
153 1 : if l.empty() {
154 1 : return 0
155 1 : }
156 1 : var count int
157 1 : for i := l.root.next; i != &l.root; i = i.next {
158 1 : count++
159 1 : }
160 1 : return count
161 : }
162 :
163 1 : func (l *snapshotList) earliest() uint64 {
164 1 : v := uint64(math.MaxUint64)
165 1 : if !l.empty() {
166 1 : v = l.root.next.seqNum
167 1 : }
168 1 : return v
169 : }
170 :
171 1 : func (l *snapshotList) toSlice() []uint64 {
172 1 : if l.empty() {
173 1 : return nil
174 1 : }
175 1 : var results []uint64
176 1 : for i := l.root.next; i != &l.root; i = i.next {
177 1 : results = append(results, i.seqNum)
178 1 : }
179 1 : return results
180 : }
181 :
182 1 : func (l *snapshotList) pushBack(s *Snapshot) {
183 1 : if s.list != nil || s.prev != nil || s.next != nil {
184 0 : panic("pebble: snapshot list is inconsistent")
185 : }
186 1 : s.prev = l.root.prev
187 1 : s.prev.next = s
188 1 : s.next = &l.root
189 1 : s.next.prev = s
190 1 : s.list = l
191 : }
192 :
193 1 : func (l *snapshotList) remove(s *Snapshot) {
194 1 : if s == &l.root {
195 0 : panic("pebble: cannot remove snapshot list root node")
196 : }
197 1 : if s.list != l {
198 0 : panic("pebble: snapshot list is inconsistent")
199 : }
200 1 : s.prev.next = s.next
201 1 : s.next.prev = s.prev
202 1 : s.next = nil // avoid memory leaks
203 1 : s.prev = nil // avoid memory leaks
204 1 : s.list = nil // avoid memory leaks
205 : }
206 :
207 : // EventuallyFileOnlySnapshot (aka EFOS) provides a read-only point-in-time view
208 : // of the database state, similar to Snapshot. A EventuallyFileOnlySnapshot
209 : // induces less write amplification than Snapshot, at the cost of increased space
210 : // amplification. While a Snapshot may increase write amplification across all
211 : // flushes and compactions for the duration of its lifetime, an
212 : // EventuallyFileOnlySnapshot only incurs that cost for flushes/compactions if
213 : // memtables at the time of EFOS instantiation contained keys that the EFOS is
214 : // interested in (i.e. its protectedRanges). In that case, the EFOS prevents
215 : // elision of keys visible to it, similar to a Snapshot, until those memtables
216 : // are flushed, and once that happens, the "EventuallyFileOnlySnapshot"
217 : // transitions to a file-only snapshot state in which it pins zombies sstables
218 : // like an open Iterator would, without pinning any memtables. Callers that can
219 : // tolerate the increased space amplification of pinning zombie sstables until
220 : // the snapshot is closed may prefer EventuallyFileOnlySnapshots for their
221 : // reduced write amplification. Callers that desire the benefits of the file-only
222 : // state that requires no pinning of memtables should call
223 : // `WaitForFileOnlySnapshot()` (and possibly re-mint an EFOS if it returns
224 : // ErrSnapshotExcised) before relying on the EFOS to keep producing iterators
225 : // with zero write-amp and zero pinning of memtables in memory.
226 : //
227 : // EventuallyFileOnlySnapshots interact with the IngestAndExcise operation in
228 : // subtle ways. Unlike Snapshots, EFOS guarantees that their read-only
229 : // point-in-time view is unaltered by the excision. However, if a concurrent
230 : // excise were to happen on one of the protectedRanges, WaitForFileOnlySnapshot()
231 : // would return ErrSnapshotExcised and the EFOS would maintain a reference to the
232 : // underlying readState (and by extension, zombie memtables) for its lifetime.
233 : // This could lead to increased memory utilization, which is why callers should
234 : // call WaitForFileOnlySnapshot() if they expect an EFOS to be long-lived.
235 : type EventuallyFileOnlySnapshot struct {
236 : mu struct {
237 : // NB: If both this mutex and db.mu are being grabbed, db.mu should be
238 : // grabbed _before_ grabbing this one.
239 : sync.Mutex
240 :
241 : // Either the {snap,readState} fields are set below, or the version is set at
242 : // any given point of time. If a snapshot is referenced, this is not a
243 : // file-only snapshot yet, and if a version is set (and ref'd) this is a
244 : // file-only snapshot.
245 :
246 : // The wrapped regular snapshot, if not a file-only snapshot yet. The
247 : // readState has already been ref()d once if it's set.
248 : snap *Snapshot
249 : readState *readState
250 : // The wrapped version reference, if a file-only snapshot.
251 : vers *version
252 : }
253 :
254 : // Key ranges to watch for an excise on.
255 : protectedRanges []KeyRange
256 : // excised, if true, signals that the above ranges were excised during the
257 : // lifetime of this snapshot.
258 : excised atomic.Bool
259 :
260 : // The db the snapshot was created from.
261 : db *DB
262 : seqNum uint64
263 :
264 : closed chan struct{}
265 : }
266 :
267 : func (d *DB) makeEventuallyFileOnlySnapshot(
268 : keyRanges []KeyRange, internalKeyRanges []internalKeyRange,
269 1 : ) *EventuallyFileOnlySnapshot {
270 1 : isFileOnly := true
271 1 :
272 1 : d.mu.Lock()
273 1 : defer d.mu.Unlock()
274 1 : seqNum := d.mu.versions.visibleSeqNum.Load()
275 1 : // Check if any of the keyRanges overlap with a memtable.
276 1 : for i := range d.mu.mem.queue {
277 1 : mem := d.mu.mem.queue[i]
278 1 : if ingestMemtableOverlaps(d.cmp, mem, internalKeyRanges) {
279 1 : isFileOnly = false
280 1 : break
281 : }
282 : }
283 1 : es := &EventuallyFileOnlySnapshot{
284 1 : db: d,
285 1 : seqNum: seqNum,
286 1 : protectedRanges: keyRanges,
287 1 : closed: make(chan struct{}),
288 1 : }
289 1 : if isFileOnly {
290 1 : es.mu.vers = d.mu.versions.currentVersion()
291 1 : es.mu.vers.Ref()
292 1 : } else {
293 1 : s := &Snapshot{
294 1 : db: d,
295 1 : seqNum: seqNum,
296 1 : }
297 1 : s.efos = es
298 1 : es.mu.snap = s
299 1 : es.mu.readState = d.loadReadState()
300 1 : d.mu.snapshots.pushBack(s)
301 1 : }
302 1 : return es
303 : }
304 :
305 : // Transitions this EventuallyFileOnlySnapshot to a file-only snapshot. Requires
306 : // earliestUnflushedSeqNum and vers to correspond to the same Version from the
307 : // current or a past acquisition of db.mu. vers must have been Ref()'d before
308 : // that mutex was released, if it was released.
309 : //
310 : // NB: The caller is expected to check for es.excised before making this
311 : // call.
312 : //
313 : // d.mu must be held when calling this method.
314 1 : func (es *EventuallyFileOnlySnapshot) transitionToFileOnlySnapshot(vers *version) error {
315 1 : es.mu.Lock()
316 1 : select {
317 0 : case <-es.closed:
318 0 : vers.UnrefLocked()
319 0 : es.mu.Unlock()
320 0 : return ErrClosed
321 1 : default:
322 : }
323 1 : if es.mu.snap == nil {
324 0 : es.mu.Unlock()
325 0 : panic("pebble: tried to transition an eventually-file-only-snapshot twice")
326 : }
327 : // The caller has already called Ref() on vers.
328 1 : es.mu.vers = vers
329 1 : // NB: The callers should have already done a check of es.excised.
330 1 : oldSnap := es.mu.snap
331 1 : oldReadState := es.mu.readState
332 1 : es.mu.snap = nil
333 1 : es.mu.readState = nil
334 1 : es.mu.Unlock()
335 1 : // It's okay to close a snapshot even if iterators are already open on it.
336 1 : oldReadState.unrefLocked()
337 1 : return oldSnap.closeLocked()
338 : }
339 :
340 : // releaseReadState is called to release reference to a readState when
341 : // es.excised == true. This is to free up memory as quickly as possible; all
342 : // other snapshot resources are kept around until Close() is called. Safe for
343 : // idempotent calls.
344 : //
345 : // d.mu must be held when calling this method.
346 1 : func (es *EventuallyFileOnlySnapshot) releaseReadState() {
347 1 : if !es.excised.Load() {
348 0 : panic("pebble: releasing read state of eventually-file-only-snapshot but was not excised")
349 : }
350 1 : es.mu.Lock()
351 1 : defer es.mu.Unlock()
352 1 : if es.mu.readState != nil {
353 1 : es.mu.readState.unrefLocked()
354 1 : es.db.maybeScheduleObsoleteTableDeletionLocked()
355 1 : }
356 : }
357 :
358 : // hasTransitioned returns true if this EFOS has transitioned to a file-only
359 : // snapshot.
360 1 : func (es *EventuallyFileOnlySnapshot) hasTransitioned() bool {
361 1 : es.mu.Lock()
362 1 : defer es.mu.Unlock()
363 1 : return es.mu.vers != nil
364 1 : }
365 :
366 : // waitForFlush waits for a flush on any memtables that need to be flushed
367 : // before this EFOS can transition to a file-only snapshot. If this EFOS is
368 : // waiting on a flush of the mutable memtable, it forces a rotation within
369 : // `dur` duration. For immutable memtables, it schedules a flush and waits for
370 : // it to finish.
371 1 : func (es *EventuallyFileOnlySnapshot) waitForFlush(ctx context.Context, dur time.Duration) error {
372 1 : es.db.mu.Lock()
373 1 : defer es.db.mu.Unlock()
374 1 :
375 1 : earliestUnflushedSeqNum := es.db.getEarliestUnflushedSeqNumLocked()
376 1 : for earliestUnflushedSeqNum < es.seqNum {
377 1 : select {
378 0 : case <-es.closed:
379 0 : return ErrClosed
380 0 : case <-ctx.Done():
381 0 : return ctx.Err()
382 1 : default:
383 : }
384 : // Check if the current mutable memtable contains keys less than seqNum.
385 : // If so, rotate it.
386 1 : if es.db.mu.mem.mutable.logSeqNum < es.seqNum && dur.Nanoseconds() > 0 {
387 1 : es.db.maybeScheduleDelayedFlush(es.db.mu.mem.mutable, dur)
388 1 : } else {
389 0 : // Find the last memtable that contains seqNums less than es.seqNum,
390 0 : // and force a flush on it.
391 0 : var mem *flushableEntry
392 0 : for i := range es.db.mu.mem.queue {
393 0 : if es.db.mu.mem.queue[i].logSeqNum < es.seqNum {
394 0 : mem = es.db.mu.mem.queue[i]
395 0 : }
396 : }
397 0 : mem.flushForced = true
398 0 : es.db.maybeScheduleFlush()
399 : }
400 1 : es.db.mu.compact.cond.Wait()
401 1 :
402 1 : earliestUnflushedSeqNum = es.db.getEarliestUnflushedSeqNumLocked()
403 : }
404 1 : if es.excised.Load() {
405 1 : return ErrSnapshotExcised
406 1 : }
407 1 : return nil
408 : }
409 :
410 : // WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot
411 : // has been converted into a file-only snapshot (i.e. all memtables containing
412 : // keys < seqNum are flushed). A duration can be passed in, and if nonzero,
413 : // a delayed flush will be scheduled at that duration if necessary.
414 : //
415 : // Idempotent; can be called multiple times with no side effects.
416 : func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(
417 : ctx context.Context, dur time.Duration,
418 1 : ) error {
419 1 : if es.hasTransitioned() {
420 1 : return nil
421 1 : }
422 :
423 1 : if err := es.waitForFlush(ctx, dur); err != nil {
424 1 : return err
425 1 : }
426 :
427 1 : if invariants.Enabled {
428 1 : // Since we aren't returning an error, we _must_ have transitioned to a
429 1 : // file-only snapshot by now.
430 1 : if !es.hasTransitioned() {
431 0 : panic("expected EFOS to have transitioned to file-only snapshot after flush")
432 : }
433 : }
434 1 : return nil
435 : }
436 :
437 : // Close closes the file-only snapshot and releases all referenced resources.
438 : // Not idempotent.
439 1 : func (es *EventuallyFileOnlySnapshot) Close() error {
440 1 : close(es.closed)
441 1 : es.db.mu.Lock()
442 1 : defer es.db.mu.Unlock()
443 1 : es.mu.Lock()
444 1 : defer es.mu.Unlock()
445 1 :
446 1 : if es.mu.snap != nil {
447 1 : if err := es.mu.snap.closeLocked(); err != nil {
448 0 : return err
449 0 : }
450 : }
451 1 : if es.mu.readState != nil {
452 1 : es.mu.readState.unrefLocked()
453 1 : es.db.maybeScheduleObsoleteTableDeletionLocked()
454 1 : }
455 1 : if es.mu.vers != nil {
456 1 : es.mu.vers.UnrefLocked()
457 1 : }
458 1 : return nil
459 : }
460 :
461 : // Get implements the Reader interface.
462 0 : func (es *EventuallyFileOnlySnapshot) Get(key []byte) (value []byte, closer io.Closer, err error) {
463 0 : // TODO(jackson): Use getInternal.
464 0 : iter, err := es.NewIter(nil)
465 0 : if err != nil {
466 0 : return nil, nil, err
467 0 : }
468 0 : var valid bool
469 0 : if es.db.opts.Comparer.Split != nil {
470 0 : valid = iter.SeekPrefixGE(key)
471 0 : } else {
472 0 : valid = iter.SeekGE(key)
473 0 : }
474 0 : if !valid {
475 0 : if err = firstError(iter.Error(), iter.Close()); err != nil {
476 0 : return nil, nil, err
477 0 : }
478 0 : return nil, nil, ErrNotFound
479 : }
480 0 : if !es.db.equal(iter.Key(), key) {
481 0 : return nil, nil, firstError(iter.Close(), ErrNotFound)
482 0 : }
483 0 : return iter.Value(), iter, nil
484 : }
485 :
486 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
487 : // return false). The iterator can be positioned via a call to SeekGE,
488 : // SeekLT, First or Last.
489 1 : func (es *EventuallyFileOnlySnapshot) NewIter(o *IterOptions) (*Iterator, error) {
490 1 : return es.NewIterWithContext(context.Background(), o)
491 1 : }
492 :
493 : // NewIterWithContext is like NewIter, and additionally accepts a context for
494 : // tracing.
495 : func (es *EventuallyFileOnlySnapshot) NewIterWithContext(
496 : ctx context.Context, o *IterOptions,
497 1 : ) (*Iterator, error) {
498 1 : select {
499 0 : case <-es.closed:
500 0 : panic(ErrClosed)
501 1 : default:
502 : }
503 :
504 1 : es.mu.Lock()
505 1 : defer es.mu.Unlock()
506 1 : if es.mu.vers != nil {
507 1 : sOpts := snapshotIterOpts{seqNum: es.seqNum, vers: es.mu.vers}
508 1 : return es.db.newIter(ctx, nil /* batch */, sOpts, o), nil
509 1 : }
510 :
511 1 : if es.excised.Load() {
512 1 : return nil, ErrSnapshotExcised
513 1 : }
514 1 : sOpts := snapshotIterOpts{seqNum: es.seqNum, readState: es.mu.readState}
515 1 : return es.db.newIter(ctx, nil /* batch */, sOpts, o), nil
516 : }
517 :
518 : // ScanInternal scans all internal keys within the specified bounds, truncating
519 : // any rangedels and rangekeys to those bounds. For use when an external user
520 : // needs to be aware of all internal keys that make up a key range.
521 : //
522 : // See comment on db.ScanInternal for the behaviour that can be expected of
523 : // point keys deleted by range dels and keys masked by range keys.
524 : func (es *EventuallyFileOnlySnapshot) ScanInternal(
525 : ctx context.Context,
526 : lower, upper []byte,
527 : visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
528 : visitRangeDel func(start, end []byte, seqNum uint64) error,
529 : visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
530 : visitSharedFile func(sst *SharedSSTMeta) error,
531 1 : ) error {
532 1 : if es.db == nil {
533 0 : panic(ErrClosed)
534 : }
535 1 : if es.excised.Load() {
536 0 : return ErrSnapshotExcised
537 0 : }
538 1 : var sOpts snapshotIterOpts
539 1 : es.mu.Lock()
540 1 : if es.mu.vers != nil {
541 1 : sOpts = snapshotIterOpts{
542 1 : seqNum: es.seqNum,
543 1 : vers: es.mu.vers,
544 1 : }
545 1 : } else {
546 1 : sOpts = snapshotIterOpts{
547 1 : seqNum: es.seqNum,
548 1 : readState: es.mu.readState,
549 1 : }
550 1 : }
551 1 : es.mu.Unlock()
552 1 : opts := &scanInternalOptions{
553 1 : IterOptions: IterOptions{
554 1 : KeyTypes: IterKeyTypePointsAndRanges,
555 1 : LowerBound: lower,
556 1 : UpperBound: upper,
557 1 : },
558 1 : visitPointKey: visitPointKey,
559 1 : visitRangeDel: visitRangeDel,
560 1 : visitRangeKey: visitRangeKey,
561 1 : visitSharedFile: visitSharedFile,
562 1 : skipSharedLevels: visitSharedFile != nil,
563 1 : }
564 1 : iter := es.db.newInternalIter(sOpts, opts)
565 1 : defer iter.close()
566 1 :
567 1 : return scanInternalImpl(ctx, lower, upper, iter, opts)
568 : }
|