|           Line data    Source code 
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "io"
      10             :         "math"
      11             :         "sync"
      12             :         "time"
      13             : 
      14             :         "github.com/cockroachdb/pebble/internal/invariants"
      15             :         "github.com/cockroachdb/pebble/rangekey"
      16             :         "github.com/cockroachdb/pebble/sstable"
      17             : )
      18             : 
      19             : // Snapshot provides a read-only point-in-time view of the DB state.
      20             : type Snapshot struct {
      21             :         // The db the snapshot was created from.
      22             :         db     *DB
      23             :         seqNum uint64
      24             : 
      25             :         // Set if part of an EventuallyFileOnlySnapshot.
      26             :         efos *EventuallyFileOnlySnapshot
      27             : 
      28             :         // The list the snapshot is linked into.
      29             :         list *snapshotList
      30             : 
      31             :         // The next/prev link for the snapshotList doubly-linked list of snapshots.
      32             :         prev, next *Snapshot
      33             : }
      34             : 
      35             : var _ Reader = (*Snapshot)(nil)
      36             : 
      37             : // Get gets the value for the given key. It returns ErrNotFound if the Snapshot
      38             : // does not contain the key.
      39             : //
      40             : // The caller should not modify the contents of the returned slice, but it is
      41             : // safe to modify the contents of the argument after Get returns. The returned
      42             : // slice will remain valid until the returned Closer is closed. On success, the
      43             : // caller MUST call closer.Close() or a memory leak will occur.
      44           1 : func (s *Snapshot) Get(key []byte) ([]byte, io.Closer, error) {
      45           1 :         if s.db == nil {
      46           1 :                 panic(ErrClosed)
      47             :         }
      48           1 :         return s.db.getInternal(key, nil /* batch */, s)
      49             : }
      50             : 
      51             : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
      52             : // return false). The iterator can be positioned via a call to SeekGE,
      53             : // SeekLT, First or Last.
      54           1 : func (s *Snapshot) NewIter(o *IterOptions) (*Iterator, error) {
      55           1 :         return s.NewIterWithContext(context.Background(), o)
      56           1 : }
      57             : 
      58             : // NewIterWithContext is like NewIter, and additionally accepts a context for
      59             : // tracing.
      60           1 : func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
      61           1 :         if s.db == nil {
      62           1 :                 panic(ErrClosed)
      63             :         }
      64           1 :         return s.db.newIter(ctx, nil /* batch */, newIterOpts{
      65           1 :                 snapshot: snapshotIterOpts{seqNum: s.seqNum},
      66           1 :         }, o), nil
      67             : }
      68             : 
      69             : // ScanInternal scans all internal keys within the specified bounds, truncating
      70             : // any rangedels and rangekeys to those bounds. For use when an external user
      71             : // needs to be aware of all internal keys that make up a key range.
      72             : //
      73             : // See comment on db.ScanInternal for the behaviour that can be expected of
      74             : // point keys deleted by range dels and keys masked by range keys.
      75             : func (s *Snapshot) ScanInternal(
      76             :         ctx context.Context,
      77             :         categoryAndQoS sstable.CategoryAndQoS,
      78             :         lower, upper []byte,
      79             :         visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
      80             :         visitRangeDel func(start, end []byte, seqNum uint64) error,
      81             :         visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
      82             :         visitSharedFile func(sst *SharedSSTMeta) error,
      83             :         visitExternalFile func(sst *ExternalFile) error,
      84           1 : ) error {
      85           1 :         if s.db == nil {
      86           0 :                 panic(ErrClosed)
      87             :         }
      88           1 :         scanInternalOpts := &scanInternalOptions{
      89           1 :                 CategoryAndQoS:    categoryAndQoS,
      90           1 :                 visitPointKey:     visitPointKey,
      91           1 :                 visitRangeDel:     visitRangeDel,
      92           1 :                 visitRangeKey:     visitRangeKey,
      93           1 :                 visitSharedFile:   visitSharedFile,
      94           1 :                 visitExternalFile: visitExternalFile,
      95           1 :                 IterOptions: IterOptions{
      96           1 :                         KeyTypes:   IterKeyTypePointsAndRanges,
      97           1 :                         LowerBound: lower,
      98           1 :                         UpperBound: upper,
      99           1 :                 },
     100           1 :         }
     101           1 : 
     102           1 :         iter, err := s.db.newInternalIter(ctx, snapshotIterOpts{seqNum: s.seqNum}, scanInternalOpts)
     103           1 :         if err != nil {
     104           0 :                 return err
     105           0 :         }
     106           1 :         defer iter.close()
     107           1 : 
     108           1 :         return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
     109             : }
     110             : 
     111             : // closeLocked is similar to Close(), except it requires that db.mu be held
     112             : // by the caller.
     113           1 : func (s *Snapshot) closeLocked() error {
     114           1 :         s.db.mu.snapshots.remove(s)
     115           1 : 
     116           1 :         // If s was the previous earliest snapshot, we might be able to reclaim
     117           1 :         // disk space by dropping obsolete records that were pinned by s.
     118           1 :         if e := s.db.mu.snapshots.earliest(); e > s.seqNum {
     119           1 :                 s.db.maybeScheduleCompactionPicker(pickElisionOnly)
     120           1 :         }
     121           1 :         s.db = nil
     122           1 :         return nil
     123             : }
     124             : 
     125             : // Close closes the snapshot, releasing its resources. Close must be called.
     126             : // Failure to do so will result in a tiny memory leak and a large leak of
     127             : // resources on disk due to the entries the snapshot is preventing from being
     128             : // deleted.
     129             : //
     130             : // d.mu must NOT be held by the caller.
     131           1 : func (s *Snapshot) Close() error {
     132           1 :         db := s.db
     133           1 :         if db == nil {
     134           1 :                 panic(ErrClosed)
     135             :         }
     136           1 :         db.mu.Lock()
     137           1 :         defer db.mu.Unlock()
     138           1 :         return s.closeLocked()
     139             : }
     140             : 
     141             : type snapshotList struct {
     142             :         root Snapshot
     143             : }
     144             : 
     145           1 : func (l *snapshotList) init() {
     146           1 :         l.root.next = &l.root
     147           1 :         l.root.prev = &l.root
     148           1 : }
     149             : 
     150           1 : func (l *snapshotList) empty() bool {
     151           1 :         return l.root.next == &l.root
     152           1 : }
     153             : 
     154           1 : func (l *snapshotList) count() int {
     155           1 :         if l.empty() {
     156           1 :                 return 0
     157           1 :         }
     158           0 :         var count int
     159           0 :         for i := l.root.next; i != &l.root; i = i.next {
     160           0 :                 count++
     161           0 :         }
     162           0 :         return count
     163             : }
     164             : 
     165           1 : func (l *snapshotList) earliest() uint64 {
     166           1 :         v := uint64(math.MaxUint64)
     167           1 :         if !l.empty() {
     168           1 :                 v = l.root.next.seqNum
     169           1 :         }
     170           1 :         return v
     171             : }
     172             : 
     173           1 : func (l *snapshotList) toSlice() []uint64 {
     174           1 :         if l.empty() {
     175           1 :                 return nil
     176           1 :         }
     177           1 :         var results []uint64
     178           1 :         for i := l.root.next; i != &l.root; i = i.next {
     179           1 :                 results = append(results, i.seqNum)
     180           1 :         }
     181           1 :         return results
     182             : }
     183             : 
     184           1 : func (l *snapshotList) pushBack(s *Snapshot) {
     185           1 :         if s.list != nil || s.prev != nil || s.next != nil {
     186           0 :                 panic("pebble: snapshot list is inconsistent")
     187             :         }
     188           1 :         s.prev = l.root.prev
     189           1 :         s.prev.next = s
     190           1 :         s.next = &l.root
     191           1 :         s.next.prev = s
     192           1 :         s.list = l
     193             : }
     194             : 
     195           1 : func (l *snapshotList) remove(s *Snapshot) {
     196           1 :         if s == &l.root {
     197           0 :                 panic("pebble: cannot remove snapshot list root node")
     198             :         }
     199           1 :         if s.list != l {
     200           0 :                 panic("pebble: snapshot list is inconsistent")
     201             :         }
     202           1 :         s.prev.next = s.next
     203           1 :         s.next.prev = s.prev
     204           1 :         s.next = nil // avoid memory leaks
     205           1 :         s.prev = nil // avoid memory leaks
     206           1 :         s.list = nil // avoid memory leaks
     207             : }
     208             : 
     209             : // EventuallyFileOnlySnapshot (aka EFOS) provides a read-only point-in-time view
     210             : // of the database state, similar to Snapshot. An EventuallyFileOnlySnapshot
     211             : // induces less write amplification than Snapshot, at the cost of increased space
     212             : // amplification. While a Snapshot may increase write amplification across all
     213             : // flushes and compactions for the duration of its lifetime, an
     214             : // EventuallyFileOnlySnapshot only incurs that cost for flushes/compactions if
     215             : // memtables at the time of EFOS instantiation contained keys that the EFOS is
     216             : // interested in (i.e. its protectedRanges). In that case, the EFOS prevents
     217             : // elision of keys visible to it, similar to a Snapshot, until those memtables
     218             : // are flushed, and once that happens, the "EventuallyFileOnlySnapshot"
     219             : // transitions to a file-only snapshot state in which it pins zombies sstables
     220             : // like an open Iterator would, without pinning any memtables. Callers that can
     221             : // tolerate the increased space amplification of pinning zombie sstables until
     222             : // the snapshot is closed may prefer EventuallyFileOnlySnapshots for their
     223             : // reduced write amplification. Callers that desire the benefits of the file-only
     224             : // state that requires no pinning of memtables should call
     225             : // `WaitForFileOnlySnapshot()` before relying on the EFOS to keep producing iterators
     226             : // with zero write-amp and zero pinning of memtables in memory.
     227             : //
     228             : // EventuallyFileOnlySnapshots interact with the IngestAndExcise operation in
     229             : // subtle ways. The IngestAndExcise can force the transition of an EFOS to a
     230             : // file-only snapshot if an excise overlaps with the EFOS bounds.
     231             : type EventuallyFileOnlySnapshot struct {
     232             :         mu struct {
     233             :                 // NB: If both this mutex and db.mu are being grabbed, db.mu should be
     234             :                 // grabbed _before_ grabbing this one.
     235             :                 sync.Mutex
     236             : 
     237             :                 // Either the snap field is set below, or the version is set at any given
     238             :                 // point of time. If a snapshot is referenced, this is not a file-only
     239             :                 // snapshot yet, and if a version is set (and ref'd) this is a file-only
     240             :                 // snapshot.
     241             : 
     242             :                 // The wrapped regular snapshot, if not a file-only snapshot yet.
     243             :                 snap *Snapshot
     244             :                 // The wrapped version reference, if a file-only snapshot.
     245             :                 vers *version
     246             :         }
     247             : 
     248             :         // Key ranges to watch for an excise on.
     249             :         protectedRanges []KeyRange
     250             : 
     251             :         // The db the snapshot was created from.
     252             :         db     *DB
     253             :         seqNum uint64
     254             :         closed chan struct{}
     255             : }
     256             : 
     257           1 : func (d *DB) makeEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
     258           1 :         isFileOnly := true
     259           1 : 
     260           1 :         d.mu.Lock()
     261           1 :         defer d.mu.Unlock()
     262           1 :         seqNum := d.mu.versions.visibleSeqNum.Load()
     263           1 :         // Check if any of the keyRanges overlap with a memtable.
     264           1 :         for i := range d.mu.mem.queue {
     265           1 :                 d.mu.mem.queue[i].computePossibleOverlaps(func(bounded) shouldContinue {
     266           1 :                         isFileOnly = false
     267           1 :                         return stopIteration
     268           1 :                 }, sliceAsBounded(keyRanges)...)
     269             :         }
     270           1 :         es := &EventuallyFileOnlySnapshot{
     271           1 :                 db:              d,
     272           1 :                 seqNum:          seqNum,
     273           1 :                 protectedRanges: keyRanges,
     274           1 :                 closed:          make(chan struct{}),
     275           1 :         }
     276           1 :         if isFileOnly {
     277           1 :                 es.mu.vers = d.mu.versions.currentVersion()
     278           1 :                 es.mu.vers.Ref()
     279           1 :         } else {
     280           1 :                 s := &Snapshot{
     281           1 :                         db:     d,
     282           1 :                         seqNum: seqNum,
     283           1 :                 }
     284           1 :                 s.efos = es
     285           1 :                 es.mu.snap = s
     286           1 :                 d.mu.snapshots.pushBack(s)
     287           1 :         }
     288           1 :         return es
     289             : }
     290             : 
     291             : // Transitions this EventuallyFileOnlySnapshot to a file-only snapshot. Requires
     292             : // earliestUnflushedSeqNum and vers to correspond to the same Version from the
     293             : // current or a past acquisition of db.mu. vers must have been Ref()'d before
     294             : // that mutex was released, if it was released.
     295             : //
     296             : // NB: The caller is expected to check for es.excised before making this
     297             : // call.
     298             : //
     299             : // d.mu must be held when calling this method.
     300           1 : func (es *EventuallyFileOnlySnapshot) transitionToFileOnlySnapshot(vers *version) error {
     301           1 :         es.mu.Lock()
     302           1 :         select {
     303           0 :         case <-es.closed:
     304           0 :                 vers.UnrefLocked()
     305           0 :                 es.mu.Unlock()
     306           0 :                 return ErrClosed
     307           1 :         default:
     308             :         }
     309           1 :         if es.mu.snap == nil {
     310           0 :                 es.mu.Unlock()
     311           0 :                 panic("pebble: tried to transition an eventually-file-only-snapshot twice")
     312             :         }
     313             :         // The caller has already called Ref() on vers.
     314           1 :         es.mu.vers = vers
     315           1 :         // NB: The callers should have already done a check of es.excised.
     316           1 :         oldSnap := es.mu.snap
     317           1 :         es.mu.snap = nil
     318           1 :         es.mu.Unlock()
     319           1 :         return oldSnap.closeLocked()
     320             : }
     321             : 
     322             : // hasTransitioned returns true if this EFOS has transitioned to a file-only
     323             : // snapshot.
     324           1 : func (es *EventuallyFileOnlySnapshot) hasTransitioned() bool {
     325           1 :         es.mu.Lock()
     326           1 :         defer es.mu.Unlock()
     327           1 :         return es.mu.vers != nil
     328           1 : }
     329             : 
     330             : // waitForFlush waits for a flush on any memtables that need to be flushed
     331             : // before this EFOS can transition to a file-only snapshot. If this EFOS is
     332             : // waiting on a flush of the mutable memtable, it forces a rotation within
     333             : // `dur` duration. For immutable memtables, it schedules a flush and waits for
     334             : // it to finish.
     335           1 : func (es *EventuallyFileOnlySnapshot) waitForFlush(ctx context.Context, dur time.Duration) error {
     336           1 :         es.db.mu.Lock()
     337           1 :         defer es.db.mu.Unlock()
     338           1 : 
     339           1 :         earliestUnflushedSeqNum := es.db.getEarliestUnflushedSeqNumLocked()
     340           1 :         for earliestUnflushedSeqNum < es.seqNum {
     341           1 :                 select {
     342           0 :                 case <-es.closed:
     343           0 :                         return ErrClosed
     344           0 :                 case <-ctx.Done():
     345           0 :                         return ctx.Err()
     346           1 :                 default:
     347             :                 }
     348             :                 // Check if the current mutable memtable contains keys less than seqNum.
     349             :                 // If so, rotate it.
     350           1 :                 if es.db.mu.mem.mutable.logSeqNum < es.seqNum && dur.Nanoseconds() > 0 {
     351           1 :                         es.db.maybeScheduleDelayedFlush(es.db.mu.mem.mutable, dur)
     352           1 :                 } else {
     353           0 :                         // Find the last memtable that contains seqNums less than es.seqNum,
     354           0 :                         // and force a flush on it.
     355           0 :                         var mem *flushableEntry
     356           0 :                         for i := range es.db.mu.mem.queue {
     357           0 :                                 if es.db.mu.mem.queue[i].logSeqNum < es.seqNum {
     358           0 :                                         mem = es.db.mu.mem.queue[i]
     359           0 :                                 }
     360             :                         }
     361           0 :                         mem.flushForced = true
     362           0 :                         es.db.maybeScheduleFlush()
     363             :                 }
     364           1 :                 es.db.mu.compact.cond.Wait()
     365           1 : 
     366           1 :                 earliestUnflushedSeqNum = es.db.getEarliestUnflushedSeqNumLocked()
     367             :         }
     368           1 :         return nil
     369             : }
     370             : 
     371             : // WaitForFileOnlySnapshot blocks the calling goroutine until this snapshot
     372             : // has been converted into a file-only snapshot (i.e. all memtables containing
     373             : // keys < seqNum are flushed). A duration can be passed in, and if nonzero,
     374             : // a delayed flush will be scheduled at that duration if necessary.
     375             : //
     376             : // Idempotent; can be called multiple times with no side effects.
     377             : func (es *EventuallyFileOnlySnapshot) WaitForFileOnlySnapshot(
     378             :         ctx context.Context, dur time.Duration,
     379           1 : ) error {
     380           1 :         if es.hasTransitioned() {
     381           1 :                 return nil
     382           1 :         }
     383             : 
     384           1 :         if err := es.waitForFlush(ctx, dur); err != nil {
     385           0 :                 return err
     386           0 :         }
     387             : 
     388           1 :         if invariants.Enabled {
     389           1 :                 // Since we aren't returning an error, we _must_ have transitioned to a
     390           1 :                 // file-only snapshot by now.
     391           1 :                 if !es.hasTransitioned() {
     392           0 :                         panic("expected EFOS to have transitioned to file-only snapshot after flush")
     393             :                 }
     394             :         }
     395           1 :         return nil
     396             : }
     397             : 
     398             : // Close closes the file-only snapshot and releases all referenced resources.
     399             : // Not idempotent.
     400           1 : func (es *EventuallyFileOnlySnapshot) Close() error {
     401           1 :         close(es.closed)
     402           1 :         es.db.mu.Lock()
     403           1 :         defer es.db.mu.Unlock()
     404           1 :         es.mu.Lock()
     405           1 :         defer es.mu.Unlock()
     406           1 : 
     407           1 :         if es.mu.snap != nil {
     408           1 :                 if err := es.mu.snap.closeLocked(); err != nil {
     409           0 :                         return err
     410           0 :                 }
     411             :         }
     412           1 :         if es.mu.vers != nil {
     413           1 :                 es.mu.vers.UnrefLocked()
     414           1 :         }
     415           1 :         return nil
     416             : }
     417             : 
     418             : // Get implements the Reader interface.
     419           1 : func (es *EventuallyFileOnlySnapshot) Get(key []byte) (value []byte, closer io.Closer, err error) {
     420           1 :         // TODO(jackson): Use getInternal.
     421           1 :         iter, err := es.NewIter(nil)
     422           1 :         if err != nil {
     423           0 :                 return nil, nil, err
     424           0 :         }
     425           1 :         if !iter.SeekPrefixGE(key) {
     426           1 :                 if err = firstError(iter.Error(), iter.Close()); err != nil {
     427           1 :                         return nil, nil, err
     428           1 :                 }
     429           1 :                 return nil, nil, ErrNotFound
     430             :         }
     431           1 :         if !es.db.equal(iter.Key(), key) {
     432           0 :                 return nil, nil, firstError(iter.Close(), ErrNotFound)
     433           0 :         }
     434           1 :         return iter.Value(), iter, nil
     435             : }
     436             : 
     437             : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
     438             : // return false). The iterator can be positioned via a call to SeekGE,
     439             : // SeekLT, First or Last.
     440           1 : func (es *EventuallyFileOnlySnapshot) NewIter(o *IterOptions) (*Iterator, error) {
     441           1 :         return es.NewIterWithContext(context.Background(), o)
     442           1 : }
     443             : 
     444             : // NewIterWithContext is like NewIter, and additionally accepts a context for
     445             : // tracing.
     446             : func (es *EventuallyFileOnlySnapshot) NewIterWithContext(
     447             :         ctx context.Context, o *IterOptions,
     448           1 : ) (*Iterator, error) {
     449           1 :         select {
     450           0 :         case <-es.closed:
     451           0 :                 panic(ErrClosed)
     452           1 :         default:
     453             :         }
     454             : 
     455           1 :         es.mu.Lock()
     456           1 :         defer es.mu.Unlock()
     457           1 :         if es.mu.vers != nil {
     458           1 :                 sOpts := snapshotIterOpts{seqNum: es.seqNum, vers: es.mu.vers}
     459           1 :                 return es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o), nil
     460           1 :         }
     461             : 
     462           1 :         sOpts := snapshotIterOpts{seqNum: es.seqNum}
     463           1 :         iter := es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o)
     464           1 :         return iter, nil
     465             : }
     466             : 
     467             : // ScanInternal scans all internal keys within the specified bounds, truncating
     468             : // any rangedels and rangekeys to those bounds. For use when an external user
     469             : // needs to be aware of all internal keys that make up a key range.
     470             : //
     471             : // See comment on db.ScanInternal for the behaviour that can be expected of
     472             : // point keys deleted by range dels and keys masked by range keys.
     473             : func (es *EventuallyFileOnlySnapshot) ScanInternal(
     474             :         ctx context.Context,
     475             :         categoryAndQoS sstable.CategoryAndQoS,
     476             :         lower, upper []byte,
     477             :         visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
     478             :         visitRangeDel func(start, end []byte, seqNum uint64) error,
     479             :         visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
     480             :         visitSharedFile func(sst *SharedSSTMeta) error,
     481             :         visitExternalFile func(sst *ExternalFile) error,
     482           1 : ) error {
     483           1 :         if es.db == nil {
     484           0 :                 panic(ErrClosed)
     485             :         }
     486           1 :         var sOpts snapshotIterOpts
     487           1 :         opts := &scanInternalOptions{
     488           1 :                 CategoryAndQoS: categoryAndQoS,
     489           1 :                 IterOptions: IterOptions{
     490           1 :                         KeyTypes:   IterKeyTypePointsAndRanges,
     491           1 :                         LowerBound: lower,
     492           1 :                         UpperBound: upper,
     493           1 :                 },
     494           1 :                 visitPointKey:     visitPointKey,
     495           1 :                 visitRangeDel:     visitRangeDel,
     496           1 :                 visitRangeKey:     visitRangeKey,
     497           1 :                 visitSharedFile:   visitSharedFile,
     498           1 :                 visitExternalFile: visitExternalFile,
     499           1 :         }
     500           1 :         es.mu.Lock()
     501           1 :         if es.mu.vers != nil {
     502           1 :                 sOpts = snapshotIterOpts{
     503           1 :                         seqNum: es.seqNum,
     504           1 :                         vers:   es.mu.vers,
     505           1 :                 }
     506           1 :         } else {
     507           1 :                 sOpts = snapshotIterOpts{
     508           1 :                         seqNum: es.seqNum,
     509           1 :                 }
     510           1 :         }
     511           1 :         es.mu.Unlock()
     512           1 :         iter, err := es.db.newInternalIter(ctx, sOpts, opts)
     513           1 :         if err != nil {
     514           0 :                 return err
     515           0 :         }
     516           1 :         defer iter.close()
     517           1 : 
     518           1 :         return scanInternalImpl(ctx, lower, upper, iter, opts)
     519             : }
 |