LCOV - code coverage report
Current view: top level - pebble - db.go (source / functions) Hit Total Coverage
Test: 2023-12-18 08:16Z ab4952c5 - meta test only.lcov Lines: 1236 1808 68.4 %
Date: 2023-12-18 08:17:00 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : // Package pebble provides an ordered key/value store.
       6             : package pebble // import "github.com/cockroachdb/pebble"
       7             : 
       8             : import (
       9             :         "context"
      10             :         "fmt"
      11             :         "io"
      12             :         "os"
      13             :         "strconv"
      14             :         "sync"
      15             :         "sync/atomic"
      16             :         "time"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble/internal/arenaskl"
      20             :         "github.com/cockroachdb/pebble/internal/base"
      21             :         "github.com/cockroachdb/pebble/internal/invalidating"
      22             :         "github.com/cockroachdb/pebble/internal/invariants"
      23             :         "github.com/cockroachdb/pebble/internal/keyspan"
      24             :         "github.com/cockroachdb/pebble/internal/manifest"
      25             :         "github.com/cockroachdb/pebble/internal/manual"
      26             :         "github.com/cockroachdb/pebble/objstorage"
      27             :         "github.com/cockroachdb/pebble/objstorage/remote"
      28             :         "github.com/cockroachdb/pebble/rangekey"
      29             :         "github.com/cockroachdb/pebble/record"
      30             :         "github.com/cockroachdb/pebble/sstable"
      31             :         "github.com/cockroachdb/pebble/vfs"
      32             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      33             :         "github.com/cockroachdb/tokenbucket"
      34             :         "github.com/prometheus/client_golang/prometheus"
      35             : )
      36             : 
      37             : const (
      38             :         // minTableCacheSize is the minimum size of the table cache, for a single db.
      39             :         minTableCacheSize = 64
      40             : 
      41             :         // numNonTableCacheFiles is an approximation for the number of files
      42             :         // that we don't use for table caches, for a given db.
      43             :         numNonTableCacheFiles = 10
      44             : )
      45             : 
      46             : var (
      47             :         // ErrNotFound is returned when a get operation does not find the requested
      48             :         // key.
      49             :         ErrNotFound = base.ErrNotFound
      50             :         // ErrClosed is panicked when an operation is performed on a closed snapshot or
      51             :         // DB. Use errors.Is(err, ErrClosed) to check for this error.
      52             :         ErrClosed = errors.New("pebble: closed")
      53             :         // ErrReadOnly is returned when a write operation is performed on a read-only
      54             :         // database.
      55             :         ErrReadOnly = errors.New("pebble: read-only")
      56             :         // errNoSplit indicates that the user is trying to perform a range key
      57             :         // operation but the configured Comparer does not provide a Split
      58             :         // implementation.
      59             :         errNoSplit = errors.New("pebble: Comparer.Split required for range key operations")
      60             : )
      61             : 
      62             : // Reader is a readable key/value store.
      63             : //
      64             : // It is safe to call Get and NewIter from concurrent goroutines.
      65             : type Reader interface {
      66             :         // Get gets the value for the given key. It returns ErrNotFound if the DB
      67             :         // does not contain the key.
      68             :         //
      69             :         // The caller should not modify the contents of the returned slice, but it is
      70             :         // safe to modify the contents of the argument after Get returns. The
      71             :         // returned slice will remain valid until the returned Closer is closed. On
      72             :         // success, the caller MUST call closer.Close() or a memory leak will occur.
      73             :         Get(key []byte) (value []byte, closer io.Closer, err error)
      74             : 
      75             :         // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
      76             :         // return false). The iterator can be positioned via a call to SeekGE,
      77             :         // SeekLT, First or Last.
      78             :         NewIter(o *IterOptions) (*Iterator, error)
      79             : 
      80             :         // NewIterWithContext is like NewIter, and additionally accepts a context
      81             :         // for tracing.
      82             :         NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error)
      83             : 
      84             :         // Close closes the Reader. It may or may not close any underlying io.Reader
      85             :         // or io.Writer, depending on how the DB was created.
      86             :         //
      87             :         // It is not safe to close a DB until all outstanding iterators are closed.
      88             :         // It is valid to call Close multiple times. Other methods should not be
      89             :         // called after the DB has been closed.
      90             :         Close() error
      91             : }
      92             : 
      93             : // Writer is a writable key/value store.
      94             : //
      95             : // Goroutine safety is dependent on the specific implementation.
      96             : type Writer interface {
      97             :         // Apply the operations contained in the batch to the DB.
      98             :         //
      99             :         // It is safe to modify the contents of the arguments after Apply returns.
     100             :         Apply(batch *Batch, o *WriteOptions) error
     101             : 
     102             :         // Delete deletes the value for the given key. Deletes are blind all will
     103             :         // succeed even if the given key does not exist.
     104             :         //
     105             :         // It is safe to modify the contents of the arguments after Delete returns.
     106             :         Delete(key []byte, o *WriteOptions) error
     107             : 
     108             :         // DeleteSized behaves identically to Delete, but takes an additional
     109             :         // argument indicating the size of the value being deleted. DeleteSized
     110             :         // should be preferred when the caller has the expectation that there exists
     111             :         // a single internal KV pair for the key (eg, the key has not been
     112             :         // overwritten recently), and the caller knows the size of its value.
     113             :         //
     114             :         // DeleteSized will record the value size within the tombstone and use it to
     115             :         // inform compaction-picking heuristics which strive to reduce space
     116             :         // amplification in the LSM. This "calling your shot" mechanic allows the
     117             :         // storage engine to more accurately estimate and reduce space
     118             :         // amplification.
     119             :         //
     120             :         // It is safe to modify the contents of the arguments after DeleteSized
     121             :         // returns.
     122             :         DeleteSized(key []byte, valueSize uint32, _ *WriteOptions) error
     123             : 
     124             :         // SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
     125             :         // it is a blind operation that will succeed even if the given key does not exist.
     126             :         //
     127             :         // WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
     128             :         // then deleted using SingleDelete. The record may appear deleted immediately, but be
     129             :         // resurrected at a later time after compactions have been performed. Or the record may
     130             :         // be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
     131             :         // previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
     132             :         // only delete the most recently written version for a key. These different semantics allow
     133             :         // the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
     134             :         // corresponding Set operation is encountered. These semantics require extreme care to handle
     135             :         // properly. Only use if you have a workload where the performance gain is critical and you
     136             :         // can guarantee that a record is written once and then deleted once.
     137             :         //
     138             :         // SingleDelete is internally transformed into a Delete if the most recent record for a key is either
     139             :         // a Merge or Delete record.
     140             :         //
     141             :         // It is safe to modify the contents of the arguments after SingleDelete returns.
     142             :         SingleDelete(key []byte, o *WriteOptions) error
     143             : 
     144             :         // DeleteRange deletes all of the point keys (and values) in the range
     145             :         // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
     146             :         // delete overlapping range keys (eg, keys set via RangeKeySet).
     147             :         //
     148             :         // It is safe to modify the contents of the arguments after DeleteRange
     149             :         // returns.
     150             :         DeleteRange(start, end []byte, o *WriteOptions) error
     151             : 
     152             :         // LogData adds the specified to the batch. The data will be written to the
     153             :         // WAL, but not added to memtables or sstables. Log data is never indexed,
     154             :         // which makes it useful for testing WAL performance.
     155             :         //
     156             :         // It is safe to modify the contents of the argument after LogData returns.
     157             :         LogData(data []byte, opts *WriteOptions) error
     158             : 
     159             :         // Merge merges the value for the given key. The details of the merge are
     160             :         // dependent upon the configured merge operation.
     161             :         //
     162             :         // It is safe to modify the contents of the arguments after Merge returns.
     163             :         Merge(key, value []byte, o *WriteOptions) error
     164             : 
     165             :         // Set sets the value for the given key. It overwrites any previous value
     166             :         // for that key; a DB is not a multi-map.
     167             :         //
     168             :         // It is safe to modify the contents of the arguments after Set returns.
     169             :         Set(key, value []byte, o *WriteOptions) error
     170             : 
     171             :         // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
     172             :         // timestamp suffix to value. The suffix is optional. If any portion of the key
     173             :         // range [start, end) is already set by a range key with the same suffix value,
     174             :         // RangeKeySet overrides it.
     175             :         //
     176             :         // It is safe to modify the contents of the arguments after RangeKeySet returns.
     177             :         RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error
     178             : 
     179             :         // RangeKeyUnset removes a range key mapping the key range [start, end) at the
     180             :         // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
     181             :         // range key. RangeKeyUnset only removes portions of range keys that fall within
     182             :         // the [start, end) key span, and only range keys with suffixes that exactly
     183             :         // match the unset suffix.
     184             :         //
     185             :         // It is safe to modify the contents of the arguments after RangeKeyUnset
     186             :         // returns.
     187             :         RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error
     188             : 
     189             :         // RangeKeyDelete deletes all of the range keys in the range [start,end)
     190             :         // (inclusive on start, exclusive on end). It does not delete point keys (for
     191             :         // that use DeleteRange). RangeKeyDelete removes all range keys within the
     192             :         // bounds, including those with or without suffixes.
     193             :         //
     194             :         // It is safe to modify the contents of the arguments after RangeKeyDelete
     195             :         // returns.
     196             :         RangeKeyDelete(start, end []byte, opts *WriteOptions) error
     197             : }
     198             : 
     199             : // CPUWorkHandle represents a handle used by the CPUWorkPermissionGranter API.
     200             : type CPUWorkHandle interface {
     201             :         // Permitted indicates whether Pebble can use additional CPU resources.
     202             :         Permitted() bool
     203             : }
     204             : 
     205             : // CPUWorkPermissionGranter is used to request permission to opportunistically
     206             : // use additional CPUs to speed up internal background work.
     207             : type CPUWorkPermissionGranter interface {
     208             :         // GetPermission returns a handle regardless of whether permission is granted
     209             :         // or not. In the latter case, the handle is only useful for recording
     210             :         // the CPU time actually spent on this calling goroutine.
     211             :         GetPermission(time.Duration) CPUWorkHandle
     212             :         // CPUWorkDone must be called regardless of whether CPUWorkHandle.Permitted
     213             :         // returns true or false.
     214             :         CPUWorkDone(CPUWorkHandle)
     215             : }
     216             : 
     217             : // Use a default implementation for the CPU work granter to avoid excessive nil
     218             : // checks in the code.
     219             : type defaultCPUWorkHandle struct{}
     220             : 
     221           1 : func (d defaultCPUWorkHandle) Permitted() bool {
     222           1 :         return false
     223           1 : }
     224             : 
     225             : type defaultCPUWorkGranter struct{}
     226             : 
     227           1 : func (d defaultCPUWorkGranter) GetPermission(_ time.Duration) CPUWorkHandle {
     228           1 :         return defaultCPUWorkHandle{}
     229           1 : }
     230             : 
     231           1 : func (d defaultCPUWorkGranter) CPUWorkDone(_ CPUWorkHandle) {}
     232             : 
     233             : // DB provides a concurrent, persistent ordered key/value store.
     234             : //
     235             : // A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get
     236             : // and Delete will return ErrNotFound if the requested key is not in the store.
     237             : // Callers are free to ignore this error.
     238             : //
     239             : // A DB also allows for iterating over the key/value pairs in key order. If d
     240             : // is a DB, the code below prints all key/value pairs whose keys are 'greater
     241             : // than or equal to' k:
     242             : //
     243             : //      iter := d.NewIter(readOptions)
     244             : //      for iter.SeekGE(k); iter.Valid(); iter.Next() {
     245             : //              fmt.Printf("key=%q value=%q\n", iter.Key(), iter.Value())
     246             : //      }
     247             : //      return iter.Close()
     248             : //
     249             : // The Options struct holds the optional parameters for the DB, including a
     250             : // Comparer to define a 'less than' relationship over keys. It is always valid
     251             : // to pass a nil *Options, which means to use the default parameter values. Any
     252             : // zero field of a non-nil *Options also means to use the default value for
     253             : // that parameter. Thus, the code below uses a custom Comparer, but the default
     254             : // values for every other parameter:
     255             : //
     256             : //      db := pebble.Open(&Options{
     257             : //              Comparer: myComparer,
     258             : //      })
     259             : type DB struct {
     260             :         // The count and size of referenced memtables. This includes memtables
     261             :         // present in DB.mu.mem.queue, as well as memtables that have been flushed
     262             :         // but are still referenced by an inuse readState, as well as up to one
     263             :         // memTable waiting to be reused and stored in d.memTableRecycle.
     264             :         memTableCount    atomic.Int64
     265             :         memTableReserved atomic.Int64 // number of bytes reserved in the cache for memtables
     266             :         // memTableRecycle holds a pointer to an obsolete memtable. The next
     267             :         // memtable allocation will reuse this memtable if it has not already been
     268             :         // recycled.
     269             :         memTableRecycle atomic.Pointer[memTable]
     270             : 
     271             :         // The size of the current log file (i.e. db.mu.log.queue[len(queue)-1].
     272             :         logSize atomic.Uint64
     273             : 
     274             :         // The number of bytes available on disk.
     275             :         diskAvailBytes atomic.Uint64
     276             : 
     277             :         cacheID        uint64
     278             :         dirname        string
     279             :         walDirname     string
     280             :         opts           *Options
     281             :         cmp            Compare
     282             :         equal          Equal
     283             :         merge          Merge
     284             :         split          Split
     285             :         abbreviatedKey AbbreviatedKey
     286             :         // The threshold for determining when a batch is "large" and will skip being
     287             :         // inserted into a memtable.
     288             :         largeBatchThreshold uint64
     289             :         // The current OPTIONS file number.
     290             :         optionsFileNum base.DiskFileNum
     291             :         // The on-disk size of the current OPTIONS file.
     292             :         optionsFileSize uint64
     293             : 
     294             :         // objProvider is used to access and manage SSTs.
     295             :         objProvider objstorage.Provider
     296             : 
     297             :         fileLock *Lock
     298             :         dataDir  vfs.File
     299             :         walDir   vfs.File
     300             : 
     301             :         tableCache           *tableCacheContainer
     302             :         newIters             tableNewIters
     303             :         tableNewRangeKeyIter keyspan.TableNewSpanIter
     304             : 
     305             :         commit *commitPipeline
     306             : 
     307             :         // readState provides access to the state needed for reading without needing
     308             :         // to acquire DB.mu.
     309             :         readState struct {
     310             :                 sync.RWMutex
     311             :                 val *readState
     312             :         }
     313             :         // logRecycler holds a set of log file numbers that are available for
     314             :         // reuse. Writing to a recycled log file is faster than to a new log file on
     315             :         // some common filesystems (xfs, and ext3/4) due to avoiding metadata
     316             :         // updates.
     317             :         logRecycler logRecycler
     318             : 
     319             :         closed   *atomic.Value
     320             :         closedCh chan struct{}
     321             : 
     322             :         cleanupManager *cleanupManager
     323             : 
     324             :         // During an iterator close, we may asynchronously schedule read compactions.
     325             :         // We want to wait for those goroutines to finish, before closing the DB.
     326             :         // compactionShedulers.Wait() should not be called while the DB.mu is held.
     327             :         compactionSchedulers sync.WaitGroup
     328             : 
     329             :         // The main mutex protecting internal DB state. This mutex encompasses many
     330             :         // fields because those fields need to be accessed and updated atomically. In
     331             :         // particular, the current version, log.*, mem.*, and snapshot list need to
     332             :         // be accessed and updated atomically during compaction.
     333             :         //
     334             :         // Care is taken to avoid holding DB.mu during IO operations. Accomplishing
     335             :         // this sometimes requires releasing DB.mu in a method that was called with
     336             :         // it held. See versionSet.logAndApply() and DB.makeRoomForWrite() for
     337             :         // examples. This is a common pattern, so be careful about expectations that
     338             :         // DB.mu will be held continuously across a set of calls.
     339             :         mu struct {
     340             :                 sync.Mutex
     341             : 
     342             :                 formatVers struct {
     343             :                         // vers is the database's current format major version.
     344             :                         // Backwards-incompatible features are gated behind new
     345             :                         // format major versions and not enabled until a database's
     346             :                         // version is ratcheted upwards.
     347             :                         //
     348             :                         // Although this is under the `mu` prefix, readers may read vers
     349             :                         // atomically without holding d.mu. Writers must only write to this
     350             :                         // value through finalizeFormatVersUpgrade which requires d.mu is
     351             :                         // held.
     352             :                         vers atomic.Uint64
     353             :                         // marker is the atomic marker for the format major version.
     354             :                         // When a database's version is ratcheted upwards, the
     355             :                         // marker is moved in order to atomically record the new
     356             :                         // version.
     357             :                         marker *atomicfs.Marker
     358             :                         // ratcheting when set to true indicates that the database is
     359             :                         // currently in the process of ratcheting the format major version
     360             :                         // to vers + 1. As a part of ratcheting the format major version,
     361             :                         // migrations may drop and re-acquire the mutex.
     362             :                         ratcheting bool
     363             :                 }
     364             : 
     365             :                 // The ID of the next job. Job IDs are passed to event listener
     366             :                 // notifications and act as a mechanism for tying together the events and
     367             :                 // log messages for a single job such as a flush, compaction, or file
     368             :                 // ingestion. Job IDs are not serialized to disk or used for correctness.
     369             :                 nextJobID int
     370             : 
     371             :                 // The collection of immutable versions and state about the log and visible
     372             :                 // sequence numbers. Use the pointer here to ensure the atomic fields in
     373             :                 // version set are aligned properly.
     374             :                 versions *versionSet
     375             : 
     376             :                 log struct {
     377             :                         // The queue of logs, containing both flushed and unflushed logs. The
     378             :                         // flushed logs will be a prefix, the unflushed logs a suffix. The
     379             :                         // delimeter between flushed and unflushed logs is
     380             :                         // versionSet.minUnflushedLogNum.
     381             :                         queue []fileInfo
     382             :                         // The number of input bytes to the log. This is the raw size of the
     383             :                         // batches written to the WAL, without the overhead of the record
     384             :                         // envelopes.
     385             :                         bytesIn uint64
     386             :                         // The LogWriter is protected by commitPipeline.mu. This allows log
     387             :                         // writes to be performed without holding DB.mu, but requires both
     388             :                         // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
     389             :                         // (i.e. makeRoomForWrite).
     390             :                         *record.LogWriter
     391             :                         // Can be nil.
     392             :                         metrics struct {
     393             :                                 fsyncLatency prometheus.Histogram
     394             :                                 record.LogWriterMetrics
     395             :                         }
     396             :                         registerLogWriterForTesting func(w *record.LogWriter)
     397             :                 }
     398             : 
     399             :                 mem struct {
     400             :                         // The current mutable memTable.
     401             :                         mutable *memTable
     402             :                         // Queue of flushables (the mutable memtable is at end). Elements are
     403             :                         // added to the end of the slice and removed from the beginning. Once an
     404             :                         // index is set it is never modified making a fixed slice immutable and
     405             :                         // safe for concurrent reads.
     406             :                         queue flushableList
     407             :                         // nextSize is the size of the next memtable. The memtable size starts at
     408             :                         // min(256KB,Options.MemTableSize) and doubles each time a new memtable
     409             :                         // is allocated up to Options.MemTableSize. This reduces the memory
     410             :                         // footprint of memtables when lots of DB instances are used concurrently
     411             :                         // in test environments.
     412             :                         nextSize uint64
     413             :                 }
     414             : 
     415             :                 compact struct {
     416             :                         // Condition variable used to signal when a flush or compaction has
     417             :                         // completed. Used by the write-stall mechanism to wait for the stall
     418             :                         // condition to clear. See DB.makeRoomForWrite().
     419             :                         cond sync.Cond
     420             :                         // True when a flush is in progress.
     421             :                         flushing bool
     422             :                         // The number of ongoing compactions.
     423             :                         compactingCount int
     424             :                         // The list of deletion hints, suggesting ranges for delete-only
     425             :                         // compactions.
     426             :                         deletionHints []deleteCompactionHint
     427             :                         // The list of manual compactions. The next manual compaction to perform
     428             :                         // is at the start of the list. New entries are added to the end.
     429             :                         manual []*manualCompaction
     430             :                         // downloads is the list of suggested download tasks. The next download to
     431             :                         // perform is at the start of the list. New entries are added to the end.
     432             :                         downloads []*downloadSpan
     433             :                         // inProgress is the set of in-progress flushes and compactions.
     434             :                         // It's used in the calculation of some metrics and to initialize L0
     435             :                         // sublevels' state. Some of the compactions contained within this
     436             :                         // map may have already committed an edit to the version but are
     437             :                         // lingering performing cleanup, like deleting obsolete files.
     438             :                         inProgress map[*compaction]struct{}
     439             : 
     440             :                         // rescheduleReadCompaction indicates to an iterator that a read compaction
     441             :                         // should be scheduled.
     442             :                         rescheduleReadCompaction bool
     443             : 
     444             :                         // readCompactions is a readCompactionQueue which keeps track of the
     445             :                         // compactions which we might have to perform.
     446             :                         readCompactions readCompactionQueue
     447             : 
     448             :                         // The cumulative duration of all completed compactions since Open.
     449             :                         // Does not include flushes.
     450             :                         duration time.Duration
     451             :                         // Flush throughput metric.
     452             :                         flushWriteThroughput ThroughputMetric
     453             :                         // The idle start time for the flush "loop", i.e., when the flushing
     454             :                         // bool above transitions to false.
     455             :                         noOngoingFlushStartTime time.Time
     456             :                 }
     457             : 
     458             :                 // Non-zero when file cleaning is disabled. The disabled count acts as a
     459             :                 // reference count to prohibit file cleaning. See
     460             :                 // DB.{disable,Enable}FileDeletions().
     461             :                 disableFileDeletions int
     462             : 
     463             :                 snapshots struct {
     464             :                         // The list of active snapshots.
     465             :                         snapshotList
     466             : 
     467             :                         // The cumulative count and size of snapshot-pinned keys written to
     468             :                         // sstables.
     469             :                         cumulativePinnedCount uint64
     470             :                         cumulativePinnedSize  uint64
     471             :                 }
     472             : 
     473             :                 tableStats struct {
     474             :                         // Condition variable used to signal the completion of a
     475             :                         // job to collect table stats.
     476             :                         cond sync.Cond
     477             :                         // True when a stat collection operation is in progress.
     478             :                         loading bool
     479             :                         // True if stat collection has loaded statistics for all tables
     480             :                         // other than those listed explicitly in pending. This flag starts
     481             :                         // as false when a database is opened and flips to true once stat
     482             :                         // collection has caught up.
     483             :                         loadedInitial bool
     484             :                         // A slice of files for which stats have not been computed.
     485             :                         // Compactions, ingests, flushes append files to be processed. An
     486             :                         // active stat collection goroutine clears the list and processes
     487             :                         // them.
     488             :                         pending []manifest.NewFileEntry
     489             :                 }
     490             : 
     491             :                 tableValidation struct {
     492             :                         // cond is a condition variable used to signal the completion of a
     493             :                         // job to validate one or more sstables.
     494             :                         cond sync.Cond
     495             :                         // pending is a slice of metadata for sstables waiting to be
     496             :                         // validated. Only physical sstables should be added to the pending
     497             :                         // queue.
     498             :                         pending []newFileEntry
     499             :                         // validating is set to true when validation is running.
     500             :                         validating bool
     501             :                 }
     502             :         }
     503             : 
     504             :         // Normally equal to time.Now() but may be overridden in tests.
     505             :         timeNow func() time.Time
     506             :         // the time at database Open; may be used to compute metrics like effective
     507             :         // compaction concurrency
     508             :         openedAt time.Time
     509             : }
     510             : 
     511             : var _ Reader = (*DB)(nil)
     512             : var _ Writer = (*DB)(nil)
     513             : 
     514             : // TestOnlyWaitForCleaning MUST only be used in tests.
     515           0 : func (d *DB) TestOnlyWaitForCleaning() {
     516           0 :         d.cleanupManager.Wait()
     517           0 : }
     518             : 
     519             : // Get gets the value for the given key. It returns ErrNotFound if the DB does
     520             : // not contain the key.
     521             : //
     522             : // The caller should not modify the contents of the returned slice, but it is
     523             : // safe to modify the contents of the argument after Get returns. The returned
     524             : // slice will remain valid until the returned Closer is closed. On success, the
     525             : // caller MUST call closer.Close() or a memory leak will occur.
     526           1 : func (d *DB) Get(key []byte) ([]byte, io.Closer, error) {
     527           1 :         return d.getInternal(key, nil /* batch */, nil /* snapshot */)
     528           1 : }
     529             : 
     530             : type getIterAlloc struct {
     531             :         dbi    Iterator
     532             :         keyBuf []byte
     533             :         get    getIter
     534             : }
     535             : 
     536             : var getIterAllocPool = sync.Pool{
     537           1 :         New: func() interface{} {
     538           1 :                 return &getIterAlloc{}
     539           1 :         },
     540             : }
     541             : 
     542           1 : func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, error) {
     543           1 :         if err := d.closed.Load(); err != nil {
     544           0 :                 panic(err)
     545             :         }
     546             : 
     547             :         // Grab and reference the current readState. This prevents the underlying
     548             :         // files in the associated version from being deleted if there is a current
     549             :         // compaction. The readState is unref'd by Iterator.Close().
     550           1 :         readState := d.loadReadState()
     551           1 : 
     552           1 :         // Determine the seqnum to read at after grabbing the read state (current and
     553           1 :         // memtables) above.
     554           1 :         var seqNum uint64
     555           1 :         if s != nil {
     556           1 :                 seqNum = s.seqNum
     557           1 :         } else {
     558           1 :                 seqNum = d.mu.versions.visibleSeqNum.Load()
     559           1 :         }
     560             : 
     561           1 :         buf := getIterAllocPool.Get().(*getIterAlloc)
     562           1 : 
     563           1 :         get := &buf.get
     564           1 :         *get = getIter{
     565           1 :                 logger:   d.opts.Logger,
     566           1 :                 comparer: d.opts.Comparer,
     567           1 :                 newIters: d.newIters,
     568           1 :                 snapshot: seqNum,
     569           1 :                 key:      key,
     570           1 :                 batch:    b,
     571           1 :                 mem:      readState.memtables,
     572           1 :                 l0:       readState.current.L0SublevelFiles,
     573           1 :                 version:  readState.current,
     574           1 :         }
     575           1 : 
     576           1 :         // Strip off memtables which cannot possibly contain the seqNum being read
     577           1 :         // at.
     578           1 :         for len(get.mem) > 0 {
     579           1 :                 n := len(get.mem)
     580           1 :                 if logSeqNum := get.mem[n-1].logSeqNum; logSeqNum < seqNum {
     581           1 :                         break
     582             :                 }
     583           1 :                 get.mem = get.mem[:n-1]
     584             :         }
     585             : 
     586           1 :         i := &buf.dbi
     587           1 :         pointIter := get
     588           1 :         *i = Iterator{
     589           1 :                 ctx:          context.Background(),
     590           1 :                 getIterAlloc: buf,
     591           1 :                 iter:         pointIter,
     592           1 :                 pointIter:    pointIter,
     593           1 :                 merge:        d.merge,
     594           1 :                 comparer:     *d.opts.Comparer,
     595           1 :                 readState:    readState,
     596           1 :                 keyBuf:       buf.keyBuf,
     597           1 :         }
     598           1 : 
     599           1 :         if !i.First() {
     600           1 :                 err := i.Close()
     601           1 :                 if err != nil {
     602           0 :                         return nil, nil, err
     603           0 :                 }
     604           1 :                 return nil, nil, ErrNotFound
     605             :         }
     606           1 :         return i.Value(), i, nil
     607             : }
     608             : 
     609             : // Set sets the value for the given key. It overwrites any previous value
     610             : // for that key; a DB is not a multi-map.
     611             : //
     612             : // It is safe to modify the contents of the arguments after Set returns.
     613           1 : func (d *DB) Set(key, value []byte, opts *WriteOptions) error {
     614           1 :         b := newBatch(d)
     615           1 :         _ = b.Set(key, value, opts)
     616           1 :         if err := d.Apply(b, opts); err != nil {
     617           0 :                 return err
     618           0 :         }
     619             :         // Only release the batch on success.
     620           1 :         b.release()
     621           1 :         return nil
     622             : }
     623             : 
     624             : // Delete deletes the value for the given key. Deletes are blind all will
     625             : // succeed even if the given key does not exist.
     626             : //
     627             : // It is safe to modify the contents of the arguments after Delete returns.
     628           1 : func (d *DB) Delete(key []byte, opts *WriteOptions) error {
     629           1 :         b := newBatch(d)
     630           1 :         _ = b.Delete(key, opts)
     631           1 :         if err := d.Apply(b, opts); err != nil {
     632           0 :                 return err
     633           0 :         }
     634             :         // Only release the batch on success.
     635           1 :         b.release()
     636           1 :         return nil
     637             : }
     638             : 
     639             : // DeleteSized behaves identically to Delete, but takes an additional
     640             : // argument indicating the size of the value being deleted. DeleteSized
     641             : // should be preferred when the caller has the expectation that there exists
     642             : // a single internal KV pair for the key (eg, the key has not been
     643             : // overwritten recently), and the caller knows the size of its value.
     644             : //
     645             : // DeleteSized will record the value size within the tombstone and use it to
     646             : // inform compaction-picking heuristics which strive to reduce space
     647             : // amplification in the LSM. This "calling your shot" mechanic allows the
     648             : // storage engine to more accurately estimate and reduce space amplification.
     649             : //
     650             : // It is safe to modify the contents of the arguments after DeleteSized
     651             : // returns.
     652           1 : func (d *DB) DeleteSized(key []byte, valueSize uint32, opts *WriteOptions) error {
     653           1 :         b := newBatch(d)
     654           1 :         _ = b.DeleteSized(key, valueSize, opts)
     655           1 :         if err := d.Apply(b, opts); err != nil {
     656           0 :                 return err
     657           0 :         }
     658             :         // Only release the batch on success.
     659           1 :         b.release()
     660           1 :         return nil
     661             : }
     662             : 
     663             : // SingleDelete adds an action to the batch that single deletes the entry for key.
     664             : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
     665             : //
     666             : // It is safe to modify the contents of the arguments after SingleDelete returns.
     667           1 : func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
     668           1 :         b := newBatch(d)
     669           1 :         _ = b.SingleDelete(key, opts)
     670           1 :         if err := d.Apply(b, opts); err != nil {
     671           0 :                 return err
     672           0 :         }
     673             :         // Only release the batch on success.
     674           1 :         b.release()
     675           1 :         return nil
     676             : }
     677             : 
     678             : // DeleteRange deletes all of the keys (and values) in the range [start,end)
     679             : // (inclusive on start, exclusive on end).
     680             : //
     681             : // It is safe to modify the contents of the arguments after DeleteRange
     682             : // returns.
     683           1 : func (d *DB) DeleteRange(start, end []byte, opts *WriteOptions) error {
     684           1 :         b := newBatch(d)
     685           1 :         _ = b.DeleteRange(start, end, opts)
     686           1 :         if err := d.Apply(b, opts); err != nil {
     687           0 :                 return err
     688           0 :         }
     689             :         // Only release the batch on success.
     690           1 :         b.release()
     691           1 :         return nil
     692             : }
     693             : 
     694             : // Merge adds an action to the DB that merges the value at key with the new
     695             : // value. The details of the merge are dependent upon the configured merge
     696             : // operator.
     697             : //
     698             : // It is safe to modify the contents of the arguments after Merge returns.
     699           1 : func (d *DB) Merge(key, value []byte, opts *WriteOptions) error {
     700           1 :         b := newBatch(d)
     701           1 :         _ = b.Merge(key, value, opts)
     702           1 :         if err := d.Apply(b, opts); err != nil {
     703           0 :                 return err
     704           0 :         }
     705             :         // Only release the batch on success.
     706           1 :         b.release()
     707           1 :         return nil
     708             : }
     709             : 
     710             : // LogData adds the specified to the batch. The data will be written to the
     711             : // WAL, but not added to memtables or sstables. Log data is never indexed,
     712             : // which makes it useful for testing WAL performance.
     713             : //
     714             : // It is safe to modify the contents of the argument after LogData returns.
     715           0 : func (d *DB) LogData(data []byte, opts *WriteOptions) error {
     716           0 :         b := newBatch(d)
     717           0 :         _ = b.LogData(data, opts)
     718           0 :         if err := d.Apply(b, opts); err != nil {
     719           0 :                 return err
     720           0 :         }
     721             :         // Only release the batch on success.
     722           0 :         b.release()
     723           0 :         return nil
     724             : }
     725             : 
     726             : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
     727             : // timestamp suffix to value. The suffix is optional. If any portion of the key
     728             : // range [start, end) is already set by a range key with the same suffix value,
     729             : // RangeKeySet overrides it.
     730             : //
     731             : // It is safe to modify the contents of the arguments after RangeKeySet returns.
     732           1 : func (d *DB) RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error {
     733           1 :         b := newBatch(d)
     734           1 :         _ = b.RangeKeySet(start, end, suffix, value, opts)
     735           1 :         if err := d.Apply(b, opts); err != nil {
     736           0 :                 return err
     737           0 :         }
     738             :         // Only release the batch on success.
     739           1 :         b.release()
     740           1 :         return nil
     741             : }
     742             : 
     743             : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
     744             : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
     745             : // range key. RangeKeyUnset only removes portions of range keys that fall within
     746             : // the [start, end) key span, and only range keys with suffixes that exactly
     747             : // match the unset suffix.
     748             : //
     749             : // It is safe to modify the contents of the arguments after RangeKeyUnset
     750             : // returns.
     751           1 : func (d *DB) RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error {
     752           1 :         b := newBatch(d)
     753           1 :         _ = b.RangeKeyUnset(start, end, suffix, opts)
     754           1 :         if err := d.Apply(b, opts); err != nil {
     755           0 :                 return err
     756           0 :         }
     757             :         // Only release the batch on success.
     758           1 :         b.release()
     759           1 :         return nil
     760             : }
     761             : 
     762             : // RangeKeyDelete deletes all of the range keys in the range [start,end)
     763             : // (inclusive on start, exclusive on end). It does not delete point keys (for
     764             : // that use DeleteRange). RangeKeyDelete removes all range keys within the
     765             : // bounds, including those with or without suffixes.
     766             : //
     767             : // It is safe to modify the contents of the arguments after RangeKeyDelete
     768             : // returns.
     769           1 : func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
     770           1 :         b := newBatch(d)
     771           1 :         _ = b.RangeKeyDelete(start, end, opts)
     772           1 :         if err := d.Apply(b, opts); err != nil {
     773           0 :                 return err
     774           0 :         }
     775             :         // Only release the batch on success.
     776           1 :         b.release()
     777           1 :         return nil
     778             : }
     779             : 
     780             : // Apply the operations contained in the batch to the DB. If the batch is large
     781             : // the contents of the batch may be retained by the database. If that occurs
     782             : // the batch contents will be cleared preventing the caller from attempting to
     783             : // reuse them.
     784             : //
     785             : // It is safe to modify the contents of the arguments after Apply returns.
     786           1 : func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
     787           1 :         return d.applyInternal(batch, opts, false)
     788           1 : }
     789             : 
     790             : // ApplyNoSyncWait must only be used when opts.Sync is true and the caller
     791             : // does not want to wait for the WAL fsync to happen. The method will return
     792             : // once the mutation is applied to the memtable and is visible (note that a
     793             : // mutation is visible before the WAL sync even in the wait case, so we have
     794             : // not weakened the durability semantics). The caller must call Batch.SyncWait
     795             : // to wait for the WAL fsync. The caller must not Close the batch without
     796             : // first calling Batch.SyncWait.
     797             : //
     798             : // RECOMMENDATION: Prefer using Apply unless you really understand why you
     799             : // need ApplyNoSyncWait.
     800             : // EXPERIMENTAL: API/feature subject to change. Do not yet use outside
     801             : // CockroachDB.
     802           1 : func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
     803           1 :         if !opts.Sync {
     804           0 :                 return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
     805           0 :         }
     806           1 :         return d.applyInternal(batch, opts, true)
     807             : }
     808             : 
     809             : // REQUIRES: noSyncWait => opts.Sync
     810           1 : func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
     811           1 :         if err := d.closed.Load(); err != nil {
     812           0 :                 panic(err)
     813             :         }
     814           1 :         if batch.committing {
     815           0 :                 panic("pebble: batch already committing")
     816             :         }
     817           1 :         if batch.applied.Load() {
     818           0 :                 panic("pebble: batch already applied")
     819             :         }
     820           1 :         if d.opts.ReadOnly {
     821           0 :                 return ErrReadOnly
     822           0 :         }
     823           1 :         if batch.db != nil && batch.db != d {
     824           0 :                 panic(fmt.Sprintf("pebble: batch db mismatch: %p != %p", batch.db, d))
     825             :         }
     826             : 
     827           1 :         sync := opts.GetSync()
     828           1 :         if sync && d.opts.DisableWAL {
     829           0 :                 return errors.New("pebble: WAL disabled")
     830           0 :         }
     831             : 
     832           1 :         if batch.minimumFormatMajorVersion != FormatMostCompatible {
     833           1 :                 if fmv := d.FormatMajorVersion(); fmv < batch.minimumFormatMajorVersion {
     834           0 :                         panic(fmt.Sprintf(
     835           0 :                                 "pebble: batch requires at least format major version %d (current: %d)",
     836           0 :                                 batch.minimumFormatMajorVersion, fmv,
     837           0 :                         ))
     838             :                 }
     839             :         }
     840             : 
     841           1 :         if batch.countRangeKeys > 0 {
     842           1 :                 if d.split == nil {
     843           0 :                         return errNoSplit
     844           0 :                 }
     845             :         }
     846           1 :         batch.committing = true
     847           1 : 
     848           1 :         if batch.db == nil {
     849           0 :                 if err := batch.refreshMemTableSize(); err != nil {
     850           0 :                         return err
     851           0 :                 }
     852             :         }
     853           1 :         if batch.memTableSize >= d.largeBatchThreshold {
     854           1 :                 var err error
     855           1 :                 batch.flushable, err = newFlushableBatch(batch, d.opts.Comparer)
     856           1 :                 if err != nil {
     857           0 :                         return err
     858           0 :                 }
     859             :         }
     860           1 :         if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
     861           0 :                 // There isn't much we can do on an error here. The commit pipeline will be
     862           0 :                 // horked at this point.
     863           0 :                 d.opts.Logger.Fatalf("pebble: fatal commit error: %v", err)
     864           0 :         }
     865             :         // If this is a large batch, we need to clear the batch contents as the
     866             :         // flushable batch may still be present in the flushables queue.
     867             :         //
     868             :         // TODO(peter): Currently large batches are written to the WAL. We could
     869             :         // skip the WAL write and instead wait for the large batch to be flushed to
     870             :         // an sstable. For a 100 MB batch, this might actually be faster. For a 1
     871             :         // GB batch this is almost certainly faster.
     872           1 :         if batch.flushable != nil {
     873           1 :                 batch.data = nil
     874           1 :         }
     875           1 :         return nil
     876             : }
     877             : 
     878           1 : func (d *DB) commitApply(b *Batch, mem *memTable) error {
     879           1 :         if b.flushable != nil {
     880           1 :                 // This is a large batch which was already added to the immutable queue.
     881           1 :                 return nil
     882           1 :         }
     883           1 :         err := mem.apply(b, b.SeqNum())
     884           1 :         if err != nil {
     885           0 :                 return err
     886           0 :         }
     887             : 
     888             :         // If the batch contains range tombstones and the database is configured
     889             :         // to flush range deletions, schedule a delayed flush so that disk space
     890             :         // may be reclaimed without additional writes or an explicit flush.
     891           1 :         if b.countRangeDels > 0 && d.opts.FlushDelayDeleteRange > 0 {
     892           1 :                 d.mu.Lock()
     893           1 :                 d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayDeleteRange)
     894           1 :                 d.mu.Unlock()
     895           1 :         }
     896             : 
     897             :         // If the batch contains range keys and the database is configured to flush
     898             :         // range keys, schedule a delayed flush so that the range keys are cleared
     899             :         // from the memtable.
     900           1 :         if b.countRangeKeys > 0 && d.opts.FlushDelayRangeKey > 0 {
     901           1 :                 d.mu.Lock()
     902           1 :                 d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayRangeKey)
     903           1 :                 d.mu.Unlock()
     904           1 :         }
     905             : 
     906           1 :         if mem.writerUnref() {
     907           1 :                 d.mu.Lock()
     908           1 :                 d.maybeScheduleFlush()
     909           1 :                 d.mu.Unlock()
     910           1 :         }
     911           1 :         return nil
     912             : }
     913             : 
     914           1 : func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
     915           1 :         var size int64
     916           1 :         repr := b.Repr()
     917           1 : 
     918           1 :         if b.flushable != nil {
     919           1 :                 // We have a large batch. Such batches are special in that they don't get
     920           1 :                 // added to the memtable, and are instead inserted into the queue of
     921           1 :                 // memtables. The call to makeRoomForWrite with this batch will force the
     922           1 :                 // current memtable to be flushed. We want the large batch to be part of
     923           1 :                 // the same log, so we add it to the WAL here, rather than after the call
     924           1 :                 // to makeRoomForWrite().
     925           1 :                 //
     926           1 :                 // Set the sequence number since it was not set to the correct value earlier
     927           1 :                 // (see comment in newFlushableBatch()).
     928           1 :                 b.flushable.setSeqNum(b.SeqNum())
     929           1 :                 if !d.opts.DisableWAL {
     930           1 :                         var err error
     931           1 :                         size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
     932           1 :                         if err != nil {
     933           0 :                                 panic(err)
     934             :                         }
     935             :                 }
     936             :         }
     937             : 
     938           1 :         d.mu.Lock()
     939           1 : 
     940           1 :         var err error
     941           1 :         if !b.ingestedSSTBatch {
     942           1 :                 // Batches which contain keys of kind InternalKeyKindIngestSST will
     943           1 :                 // never be applied to the memtable, so we don't need to make room for
     944           1 :                 // write. For the other cases, switch out the memtable if there was not
     945           1 :                 // enough room to store the batch.
     946           1 :                 err = d.makeRoomForWrite(b)
     947           1 :         }
     948             : 
     949           1 :         if err == nil && !d.opts.DisableWAL {
     950           1 :                 d.mu.log.bytesIn += uint64(len(repr))
     951           1 :         }
     952             : 
     953             :         // Grab a reference to the memtable while holding DB.mu. Note that for
     954             :         // non-flushable batches (b.flushable == nil) makeRoomForWrite() added a
     955             :         // reference to the memtable which will prevent it from being flushed until
     956             :         // we unreference it. This reference is dropped in DB.commitApply().
     957           1 :         mem := d.mu.mem.mutable
     958           1 : 
     959           1 :         d.mu.Unlock()
     960           1 :         if err != nil {
     961           0 :                 return nil, err
     962           0 :         }
     963             : 
     964           1 :         if d.opts.DisableWAL {
     965           1 :                 return mem, nil
     966           1 :         }
     967             : 
     968           1 :         if b.flushable == nil {
     969           1 :                 size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
     970           1 :                 if err != nil {
     971           0 :                         panic(err)
     972             :                 }
     973             :         }
     974             : 
     975           1 :         d.logSize.Store(uint64(size))
     976           1 :         return mem, err
     977             : }
     978             : 
     979             : type iterAlloc struct {
     980             :         dbi                 Iterator
     981             :         keyBuf              []byte
     982             :         boundsBuf           [2][]byte
     983             :         prefixOrFullSeekKey []byte
     984             :         merging             mergingIter
     985             :         mlevels             [3 + numLevels]mergingIterLevel
     986             :         levels              [3 + numLevels]levelIter
     987             :         levelsPositioned    [3 + numLevels]bool
     988             : }
     989             : 
     990             : var iterAllocPool = sync.Pool{
     991           1 :         New: func() interface{} {
     992           1 :                 return &iterAlloc{}
     993           1 :         },
     994             : }
     995             : 
     996             : // snapshotIterOpts denotes snapshot-related iterator options when calling
     997             : // newIter. These are the possible cases for a snapshotIterOpts:
     998             : //   - No snapshot: All fields are zero values.
     999             : //   - Classic snapshot: Only `seqNum` is set. The latest readState will be used
    1000             : //     and the specified seqNum will be used as the snapshot seqNum.
    1001             : //   - EventuallyFileOnlySnapshot (EFOS) behaving as a classic snapshot. Only
    1002             : //     the `seqNum` is set. The latest readState will be used
    1003             : //     and the specified seqNum will be used as the snapshot seqNum.
    1004             : //   - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
    1005             : //     relevant SSTs are referenced by the *version.
    1006             : //   - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
    1007             : //     Only `seqNum` and `readState` are set.
    1008             : type snapshotIterOpts struct {
    1009             :         seqNum    uint64
    1010             :         vers      *version
    1011             :         readState *readState
    1012             : }
    1013             : 
    1014             : type batchIterOpts struct {
    1015             :         batchOnly bool
    1016             : }
    1017             : type newIterOpts struct {
    1018             :         snapshot snapshotIterOpts
    1019             :         batch    batchIterOpts
    1020             : }
    1021             : 
    1022             : // newIter constructs a new iterator, merging in batch iterators as an extra
    1023             : // level.
    1024             : func (d *DB) newIter(
    1025             :         ctx context.Context, batch *Batch, internalOpts newIterOpts, o *IterOptions,
    1026           1 : ) *Iterator {
    1027           1 :         if internalOpts.batch.batchOnly {
    1028           0 :                 if batch == nil {
    1029           0 :                         panic("batchOnly is true, but batch is nil")
    1030             :                 }
    1031           0 :                 if internalOpts.snapshot.vers != nil {
    1032           0 :                         panic("batchOnly is true, but snapshotIterOpts is initialized")
    1033             :                 }
    1034             :         }
    1035           1 :         if err := d.closed.Load(); err != nil {
    1036           0 :                 panic(err)
    1037             :         }
    1038           1 :         seqNum := internalOpts.snapshot.seqNum
    1039           1 :         if o.rangeKeys() {
    1040           1 :                 if d.FormatMajorVersion() < FormatRangeKeys {
    1041           0 :                         panic(fmt.Sprintf(
    1042           0 :                                 "pebble: range keys require at least format major version %d (current: %d)",
    1043           0 :                                 FormatRangeKeys, d.FormatMajorVersion(),
    1044           0 :                         ))
    1045             :                 }
    1046             :         }
    1047           1 :         if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
    1048           0 :                 panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
    1049             :         }
    1050           1 :         if (batch != nil || seqNum != 0) && (o != nil && o.OnlyReadGuaranteedDurable) {
    1051           0 :                 // We could add support for OnlyReadGuaranteedDurable on snapshots if
    1052           0 :                 // there was a need: this would require checking that the sequence number
    1053           0 :                 // of the snapshot has been flushed, by comparing with
    1054           0 :                 // DB.mem.queue[0].logSeqNum.
    1055           0 :                 panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
    1056             :         }
    1057           1 :         var readState *readState
    1058           1 :         var newIters tableNewIters
    1059           1 :         var newIterRangeKey keyspan.TableNewSpanIter
    1060           1 :         if !internalOpts.batch.batchOnly {
    1061           1 :                 // Grab and reference the current readState. This prevents the underlying
    1062           1 :                 // files in the associated version from being deleted if there is a current
    1063           1 :                 // compaction. The readState is unref'd by Iterator.Close().
    1064           1 :                 if internalOpts.snapshot.vers == nil {
    1065           1 :                         if internalOpts.snapshot.readState != nil {
    1066           1 :                                 readState = internalOpts.snapshot.readState
    1067           1 :                                 readState.ref()
    1068           1 :                         } else {
    1069           1 :                                 // NB: loadReadState() calls readState.ref().
    1070           1 :                                 readState = d.loadReadState()
    1071           1 :                         }
    1072           1 :                 } else {
    1073           1 :                         // vers != nil
    1074           1 :                         internalOpts.snapshot.vers.Ref()
    1075           1 :                 }
    1076             : 
    1077             :                 // Determine the seqnum to read at after grabbing the read state (current and
    1078             :                 // memtables) above.
    1079           1 :                 if seqNum == 0 {
    1080           1 :                         seqNum = d.mu.versions.visibleSeqNum.Load()
    1081           1 :                 }
    1082           1 :                 newIters = d.newIters
    1083           1 :                 newIterRangeKey = d.tableNewRangeKeyIter
    1084             :         }
    1085             : 
    1086             :         // Bundle various structures under a single umbrella in order to allocate
    1087             :         // them together.
    1088           1 :         buf := iterAllocPool.Get().(*iterAlloc)
    1089           1 :         dbi := &buf.dbi
    1090           1 :         *dbi = Iterator{
    1091           1 :                 ctx:                 ctx,
    1092           1 :                 alloc:               buf,
    1093           1 :                 merge:               d.merge,
    1094           1 :                 comparer:            *d.opts.Comparer,
    1095           1 :                 readState:           readState,
    1096           1 :                 version:             internalOpts.snapshot.vers,
    1097           1 :                 keyBuf:              buf.keyBuf,
    1098           1 :                 prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
    1099           1 :                 boundsBuf:           buf.boundsBuf,
    1100           1 :                 batch:               batch,
    1101           1 :                 newIters:            newIters,
    1102           1 :                 newIterRangeKey:     newIterRangeKey,
    1103           1 :                 seqNum:              seqNum,
    1104           1 :                 batchOnlyIter:       internalOpts.batch.batchOnly,
    1105           1 :         }
    1106           1 :         if o != nil {
    1107           1 :                 dbi.opts = *o
    1108           1 :                 dbi.processBounds(o.LowerBound, o.UpperBound)
    1109           1 :         }
    1110           1 :         dbi.opts.logger = d.opts.Logger
    1111           1 :         if d.opts.private.disableLazyCombinedIteration {
    1112           1 :                 dbi.opts.disableLazyCombinedIteration = true
    1113           1 :         }
    1114           1 :         if batch != nil {
    1115           1 :                 dbi.batchSeqNum = dbi.batch.nextSeqNum()
    1116           1 :         }
    1117           1 :         return finishInitializingIter(ctx, buf)
    1118             : }
    1119             : 
    1120             : // finishInitializingIter is a helper for doing the non-trivial initialization
    1121             : // of an Iterator. It's invoked to perform the initial initialization of an
    1122             : // Iterator during NewIter or Clone, and to perform reinitialization due to a
    1123             : // change in IterOptions by a call to Iterator.SetOptions.
    1124           1 : func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
    1125           1 :         // Short-hand.
    1126           1 :         dbi := &buf.dbi
    1127           1 :         var memtables flushableList
    1128           1 :         if dbi.readState != nil {
    1129           1 :                 memtables = dbi.readState.memtables
    1130           1 :         }
    1131           1 :         if dbi.opts.OnlyReadGuaranteedDurable {
    1132           0 :                 memtables = nil
    1133           1 :         } else {
    1134           1 :                 // We only need to read from memtables which contain sequence numbers older
    1135           1 :                 // than seqNum. Trim off newer memtables.
    1136           1 :                 for i := len(memtables) - 1; i >= 0; i-- {
    1137           1 :                         if logSeqNum := memtables[i].logSeqNum; logSeqNum < dbi.seqNum {
    1138           1 :                                 break
    1139             :                         }
    1140           1 :                         memtables = memtables[:i]
    1141             :                 }
    1142             :         }
    1143             : 
    1144           1 :         if dbi.opts.pointKeys() {
    1145           1 :                 // Construct the point iterator, initializing dbi.pointIter to point to
    1146           1 :                 // dbi.merging. If this is called during a SetOptions call and this
    1147           1 :                 // Iterator has already initialized dbi.merging, constructPointIter is a
    1148           1 :                 // noop and an initialized pointIter already exists in dbi.pointIter.
    1149           1 :                 dbi.constructPointIter(ctx, memtables, buf)
    1150           1 :                 dbi.iter = dbi.pointIter
    1151           1 :         } else {
    1152           1 :                 dbi.iter = emptyIter
    1153           1 :         }
    1154             : 
    1155           1 :         if dbi.opts.rangeKeys() {
    1156           1 :                 dbi.rangeKeyMasking.init(dbi, dbi.comparer.Compare, dbi.comparer.Split)
    1157           1 : 
    1158           1 :                 // When iterating over both point and range keys, don't create the
    1159           1 :                 // range-key iterator stack immediately if we can avoid it. This
    1160           1 :                 // optimization takes advantage of the expected sparseness of range
    1161           1 :                 // keys, and configures the point-key iterator to dynamically switch to
    1162           1 :                 // combined iteration when it observes a file containing range keys.
    1163           1 :                 //
    1164           1 :                 // Lazy combined iteration is not possible if a batch or a memtable
    1165           1 :                 // contains any range keys.
    1166           1 :                 useLazyCombinedIteration := dbi.rangeKey == nil &&
    1167           1 :                         dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
    1168           1 :                         (dbi.batch == nil || dbi.batch.countRangeKeys == 0) &&
    1169           1 :                         !dbi.opts.disableLazyCombinedIteration
    1170           1 :                 if useLazyCombinedIteration {
    1171           1 :                         // The user requested combined iteration, and there's no indexed
    1172           1 :                         // batch currently containing range keys that would prevent lazy
    1173           1 :                         // combined iteration. Check the memtables to see if they contain
    1174           1 :                         // any range keys.
    1175           1 :                         for i := range memtables {
    1176           1 :                                 if memtables[i].containsRangeKeys() {
    1177           1 :                                         useLazyCombinedIteration = false
    1178           1 :                                         break
    1179             :                                 }
    1180             :                         }
    1181             :                 }
    1182             : 
    1183           1 :                 if useLazyCombinedIteration {
    1184           1 :                         dbi.lazyCombinedIter = lazyCombinedIter{
    1185           1 :                                 parent:    dbi,
    1186           1 :                                 pointIter: dbi.pointIter,
    1187           1 :                                 combinedIterState: combinedIterState{
    1188           1 :                                         initialized: false,
    1189           1 :                                 },
    1190           1 :                         }
    1191           1 :                         dbi.iter = &dbi.lazyCombinedIter
    1192           1 :                         dbi.iter = invalidating.MaybeWrapIfInvariants(dbi.iter)
    1193           1 :                 } else {
    1194           1 :                         dbi.lazyCombinedIter.combinedIterState = combinedIterState{
    1195           1 :                                 initialized: true,
    1196           1 :                         }
    1197           1 :                         if dbi.rangeKey == nil {
    1198           1 :                                 dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
    1199           1 :                                 dbi.rangeKey.init(dbi.comparer.Compare, dbi.comparer.Split, &dbi.opts)
    1200           1 :                                 dbi.constructRangeKeyIter()
    1201           1 :                         } else {
    1202           1 :                                 dbi.rangeKey.iterConfig.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
    1203           1 :                         }
    1204             : 
    1205             :                         // Wrap the point iterator (currently dbi.iter) with an interleaving
    1206             :                         // iterator that interleaves range keys pulled from
    1207             :                         // dbi.rangeKey.rangeKeyIter.
    1208             :                         //
    1209             :                         // NB: The interleaving iterator is always reinitialized, even if
    1210             :                         // dbi already had an initialized range key iterator, in case the point
    1211             :                         // iterator changed or the range key masking suffix changed.
    1212           1 :                         dbi.rangeKey.iiter.Init(&dbi.comparer, dbi.iter, dbi.rangeKey.rangeKeyIter,
    1213           1 :                                 keyspan.InterleavingIterOpts{
    1214           1 :                                         Mask:       &dbi.rangeKeyMasking,
    1215           1 :                                         LowerBound: dbi.opts.LowerBound,
    1216           1 :                                         UpperBound: dbi.opts.UpperBound,
    1217           1 :                                 })
    1218           1 :                         dbi.iter = &dbi.rangeKey.iiter
    1219             :                 }
    1220           1 :         } else {
    1221           1 :                 // !dbi.opts.rangeKeys()
    1222           1 :                 //
    1223           1 :                 // Reset the combined iterator state. The initialized=true ensures the
    1224           1 :                 // iterator doesn't unnecessarily try to switch to combined iteration.
    1225           1 :                 dbi.lazyCombinedIter.combinedIterState = combinedIterState{initialized: true}
    1226           1 :         }
    1227           1 :         return dbi
    1228             : }
    1229             : 
    1230             : // ScanInternal scans all internal keys within the specified bounds, truncating
    1231             : // any rangedels and rangekeys to those bounds if they span past them. For use
    1232             : // when an external user needs to be aware of all internal keys that make up a
    1233             : // key range.
    1234             : //
    1235             : // Keys deleted by range deletions must not be returned or exposed by this
    1236             : // method, while the range deletion deleting that key must be exposed using
    1237             : // visitRangeDel. Keys that would be masked by range key masking (if an
    1238             : // appropriate prefix were set) should be exposed, alongside the range key
    1239             : // that would have masked it. This method also collapses all point keys into
    1240             : // one InternalKey; so only one internal key at most per user key is returned
    1241             : // to visitPointKey.
    1242             : //
    1243             : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
    1244             : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
    1245             : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
    1246             : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
    1247             : // sstable in L5 or L6 is found that is not in shared storage according to
    1248             : // provider.IsShared, or an sstable in those levels contains a newer key than the
    1249             : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
    1250             : // of when this could happen could be if Pebble started writing sstables before a
    1251             : // creator ID was set (as creator IDs are necessary to enable shared storage)
    1252             : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
    1253             : // iteration is invalid in those cases.
    1254             : func (d *DB) ScanInternal(
    1255             :         ctx context.Context,
    1256             :         categoryAndQoS sstable.CategoryAndQoS,
    1257             :         lower, upper []byte,
    1258             :         visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
    1259             :         visitRangeDel func(start, end []byte, seqNum uint64) error,
    1260             :         visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
    1261             :         visitSharedFile func(sst *SharedSSTMeta) error,
    1262           1 : ) error {
    1263           1 :         scanInternalOpts := &scanInternalOptions{
    1264           1 :                 CategoryAndQoS:   categoryAndQoS,
    1265           1 :                 visitPointKey:    visitPointKey,
    1266           1 :                 visitRangeDel:    visitRangeDel,
    1267           1 :                 visitRangeKey:    visitRangeKey,
    1268           1 :                 visitSharedFile:  visitSharedFile,
    1269           1 :                 skipSharedLevels: visitSharedFile != nil,
    1270           1 :                 IterOptions: IterOptions{
    1271           1 :                         KeyTypes:   IterKeyTypePointsAndRanges,
    1272           1 :                         LowerBound: lower,
    1273           1 :                         UpperBound: upper,
    1274           1 :                 },
    1275           1 :         }
    1276           1 :         iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, scanInternalOpts)
    1277           1 :         if err != nil {
    1278           0 :                 return err
    1279           0 :         }
    1280           1 :         defer iter.close()
    1281           1 :         return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
    1282             : }
    1283             : 
    1284             : // newInternalIter constructs and returns a new scanInternalIterator on this db.
    1285             : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
    1286             : // to the internal iterator.
    1287             : //
    1288             : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
    1289             : // finishInitializingIter. Both pairs of methods should be refactored to reduce
    1290             : // this duplication.
    1291             : func (d *DB) newInternalIter(
    1292             :         ctx context.Context, sOpts snapshotIterOpts, o *scanInternalOptions,
    1293           1 : ) (*scanInternalIterator, error) {
    1294           1 :         if err := d.closed.Load(); err != nil {
    1295           0 :                 panic(err)
    1296             :         }
    1297             :         // Grab and reference the current readState. This prevents the underlying
    1298             :         // files in the associated version from being deleted if there is a current
    1299             :         // compaction. The readState is unref'd by Iterator.Close().
    1300           1 :         var readState *readState
    1301           1 :         if sOpts.vers == nil {
    1302           1 :                 if sOpts.readState != nil {
    1303           0 :                         readState = sOpts.readState
    1304           0 :                         readState.ref()
    1305           1 :                 } else {
    1306           1 :                         readState = d.loadReadState()
    1307           1 :                 }
    1308             :         }
    1309           1 :         if sOpts.vers != nil {
    1310           0 :                 sOpts.vers.Ref()
    1311           0 :         }
    1312             : 
    1313             :         // Determine the seqnum to read at after grabbing the read state (current and
    1314             :         // memtables) above.
    1315           1 :         seqNum := sOpts.seqNum
    1316           1 :         if seqNum == 0 {
    1317           1 :                 seqNum = d.mu.versions.visibleSeqNum.Load()
    1318           1 :         }
    1319             : 
    1320             :         // Bundle various structures under a single umbrella in order to allocate
    1321             :         // them together.
    1322           1 :         buf := iterAllocPool.Get().(*iterAlloc)
    1323           1 :         dbi := &scanInternalIterator{
    1324           1 :                 ctx:             ctx,
    1325           1 :                 db:              d,
    1326           1 :                 comparer:        d.opts.Comparer,
    1327           1 :                 merge:           d.opts.Merger.Merge,
    1328           1 :                 readState:       readState,
    1329           1 :                 version:         sOpts.vers,
    1330           1 :                 alloc:           buf,
    1331           1 :                 newIters:        d.newIters,
    1332           1 :                 newIterRangeKey: d.tableNewRangeKeyIter,
    1333           1 :                 seqNum:          seqNum,
    1334           1 :                 mergingIter:     &buf.merging,
    1335           1 :         }
    1336           1 :         dbi.opts = *o
    1337           1 :         dbi.opts.logger = d.opts.Logger
    1338           1 :         if d.opts.private.disableLazyCombinedIteration {
    1339           1 :                 dbi.opts.disableLazyCombinedIteration = true
    1340           1 :         }
    1341           1 :         return finishInitializingInternalIter(buf, dbi)
    1342             : }
    1343             : 
    1344             : func finishInitializingInternalIter(
    1345             :         buf *iterAlloc, i *scanInternalIterator,
    1346           1 : ) (*scanInternalIterator, error) {
    1347           1 :         // Short-hand.
    1348           1 :         var memtables flushableList
    1349           1 :         if i.readState != nil {
    1350           1 :                 memtables = i.readState.memtables
    1351           1 :         }
    1352             :         // We only need to read from memtables which contain sequence numbers older
    1353             :         // than seqNum. Trim off newer memtables.
    1354           1 :         for j := len(memtables) - 1; j >= 0; j-- {
    1355           1 :                 if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
    1356           1 :                         break
    1357             :                 }
    1358           1 :                 memtables = memtables[:j]
    1359             :         }
    1360           1 :         i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
    1361           1 : 
    1362           1 :         i.constructPointIter(i.opts.CategoryAndQoS, memtables, buf)
    1363           1 : 
    1364           1 :         // For internal iterators, we skip the lazy combined iteration optimization
    1365           1 :         // entirely, and create the range key iterator stack directly.
    1366           1 :         i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
    1367           1 :         i.rangeKey.init(i.comparer.Compare, i.comparer.Split, &i.opts.IterOptions)
    1368           1 :         if err := i.constructRangeKeyIter(); err != nil {
    1369           0 :                 return nil, err
    1370           0 :         }
    1371             : 
    1372             :         // Wrap the point iterator (currently i.iter) with an interleaving
    1373             :         // iterator that interleaves range keys pulled from
    1374             :         // i.rangeKey.rangeKeyIter.
    1375           1 :         i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
    1376           1 :                 keyspan.InterleavingIterOpts{
    1377           1 :                         LowerBound: i.opts.LowerBound,
    1378           1 :                         UpperBound: i.opts.UpperBound,
    1379           1 :                 })
    1380           1 :         i.iter = &i.rangeKey.iiter
    1381           1 : 
    1382           1 :         return i, nil
    1383             : }
    1384             : 
    1385             : func (i *Iterator) constructPointIter(
    1386             :         ctx context.Context, memtables flushableList, buf *iterAlloc,
    1387           1 : ) {
    1388           1 :         if i.pointIter != nil {
    1389           1 :                 // Already have one.
    1390           1 :                 return
    1391           1 :         }
    1392           1 :         internalOpts := internalIterOpts{stats: &i.stats.InternalStats}
    1393           1 :         if i.opts.RangeKeyMasking.Filter != nil {
    1394           1 :                 internalOpts.boundLimitedFilter = &i.rangeKeyMasking
    1395           1 :         }
    1396             : 
    1397             :         // Merging levels and levels from iterAlloc.
    1398           1 :         mlevels := buf.mlevels[:0]
    1399           1 :         levels := buf.levels[:0]
    1400           1 : 
    1401           1 :         // We compute the number of levels needed ahead of time and reallocate a slice if
    1402           1 :         // the array from the iterAlloc isn't large enough. Doing this allocation once
    1403           1 :         // should improve the performance.
    1404           1 :         numMergingLevels := 0
    1405           1 :         numLevelIters := 0
    1406           1 :         if i.batch != nil {
    1407           1 :                 numMergingLevels++
    1408           1 :         }
    1409             : 
    1410           1 :         var current *version
    1411           1 :         if !i.batchOnlyIter {
    1412           1 :                 numMergingLevels += len(memtables)
    1413           1 : 
    1414           1 :                 current = i.version
    1415           1 :                 if current == nil {
    1416           1 :                         current = i.readState.current
    1417           1 :                 }
    1418           1 :                 numMergingLevels += len(current.L0SublevelFiles)
    1419           1 :                 numLevelIters += len(current.L0SublevelFiles)
    1420           1 :                 for level := 1; level < len(current.Levels); level++ {
    1421           1 :                         if current.Levels[level].Empty() {
    1422           1 :                                 continue
    1423             :                         }
    1424           1 :                         numMergingLevels++
    1425           1 :                         numLevelIters++
    1426             :                 }
    1427             :         }
    1428             : 
    1429           1 :         if numMergingLevels > cap(mlevels) {
    1430           1 :                 mlevels = make([]mergingIterLevel, 0, numMergingLevels)
    1431           1 :         }
    1432           1 :         if numLevelIters > cap(levels) {
    1433           1 :                 levels = make([]levelIter, 0, numLevelIters)
    1434           1 :         }
    1435             : 
    1436             :         // Top-level is the batch, if any.
    1437           1 :         if i.batch != nil {
    1438           1 :                 if i.batch.index == nil {
    1439           0 :                         // This isn't an indexed batch. We shouldn't have gotten this far.
    1440           0 :                         panic(errors.AssertionFailedf("creating an iterator over an unindexed batch"))
    1441           1 :                 } else {
    1442           1 :                         i.batch.initInternalIter(&i.opts, &i.batchPointIter)
    1443           1 :                         i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
    1444           1 :                         // Only include the batch's rangedel iterator if it's non-empty.
    1445           1 :                         // This requires some subtle logic in the case a rangedel is later
    1446           1 :                         // written to the batch and the view of the batch is refreshed
    1447           1 :                         // during a call to SetOptions—in this case, we need to reconstruct
    1448           1 :                         // the point iterator to add the batch rangedel iterator.
    1449           1 :                         var rangeDelIter keyspan.FragmentIterator
    1450           1 :                         if i.batchRangeDelIter.Count() > 0 {
    1451           0 :                                 rangeDelIter = &i.batchRangeDelIter
    1452           0 :                         }
    1453           1 :                         mlevels = append(mlevels, mergingIterLevel{
    1454           1 :                                 iter:         &i.batchPointIter,
    1455           1 :                                 rangeDelIter: rangeDelIter,
    1456           1 :                         })
    1457             :                 }
    1458             :         }
    1459             : 
    1460           1 :         if !i.batchOnlyIter {
    1461           1 :                 // Next are the memtables.
    1462           1 :                 for j := len(memtables) - 1; j >= 0; j-- {
    1463           1 :                         mem := memtables[j]
    1464           1 :                         mlevels = append(mlevels, mergingIterLevel{
    1465           1 :                                 iter:         mem.newIter(&i.opts),
    1466           1 :                                 rangeDelIter: mem.newRangeDelIter(&i.opts),
    1467           1 :                         })
    1468           1 :                 }
    1469             : 
    1470             :                 // Next are the file levels: L0 sub-levels followed by lower levels.
    1471           1 :                 mlevelsIndex := len(mlevels)
    1472           1 :                 levelsIndex := len(levels)
    1473           1 :                 mlevels = mlevels[:numMergingLevels]
    1474           1 :                 levels = levels[:numLevelIters]
    1475           1 :                 i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
    1476           1 :                 addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
    1477           1 :                         li := &levels[levelsIndex]
    1478           1 : 
    1479           1 :                         li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
    1480           1 :                         li.initRangeDel(&mlevels[mlevelsIndex].rangeDelIter)
    1481           1 :                         li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
    1482           1 :                         li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
    1483           1 :                         mlevels[mlevelsIndex].levelIter = li
    1484           1 :                         mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
    1485           1 : 
    1486           1 :                         levelsIndex++
    1487           1 :                         mlevelsIndex++
    1488           1 :                 }
    1489             : 
    1490             :                 // Add level iterators for the L0 sublevels, iterating from newest to
    1491             :                 // oldest.
    1492           1 :                 for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
    1493           1 :                         addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
    1494           1 :                 }
    1495             : 
    1496             :                 // Add level iterators for the non-empty non-L0 levels.
    1497           1 :                 for level := 1; level < len(current.Levels); level++ {
    1498           1 :                         if current.Levels[level].Empty() {
    1499           1 :                                 continue
    1500             :                         }
    1501           1 :                         addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
    1502             :                 }
    1503             :         }
    1504           1 :         buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
    1505           1 :         if len(mlevels) <= cap(buf.levelsPositioned) {
    1506           1 :                 buf.merging.levelsPositioned = buf.levelsPositioned[:len(mlevels)]
    1507           1 :         }
    1508           1 :         buf.merging.snapshot = i.seqNum
    1509           1 :         buf.merging.batchSnapshot = i.batchSeqNum
    1510           1 :         buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
    1511           1 :         i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging)
    1512           1 :         i.merging = &buf.merging
    1513             : }
    1514             : 
    1515             : // NewBatch returns a new empty write-only batch. Any reads on the batch will
    1516             : // return an error. If the batch is committed it will be applied to the DB.
    1517           1 : func (d *DB) NewBatch() *Batch {
    1518           1 :         return newBatch(d)
    1519           1 : }
    1520             : 
    1521             : // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
    1522             : // the specified memory space for the internal slice in advance.
    1523           0 : func (d *DB) NewBatchWithSize(size int) *Batch {
    1524           0 :         return newBatchWithSize(d, size)
    1525           0 : }
    1526             : 
    1527             : // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
    1528             : // will read from both the batch and the DB. If the batch is committed it will
    1529             : // be applied to the DB. An indexed batch is slower that a non-indexed batch
    1530             : // for insert operations. If you do not need to perform reads on the batch, use
    1531             : // NewBatch instead.
    1532           1 : func (d *DB) NewIndexedBatch() *Batch {
    1533           1 :         return newIndexedBatch(d, d.opts.Comparer)
    1534           1 : }
    1535             : 
    1536             : // NewIndexedBatchWithSize is mostly identical to NewIndexedBatch, but it will
    1537             : // allocate the the specified memory space for the internal slice in advance.
    1538           0 : func (d *DB) NewIndexedBatchWithSize(size int) *Batch {
    1539           0 :         return newIndexedBatchWithSize(d, d.opts.Comparer, size)
    1540           0 : }
    1541             : 
    1542             : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
    1543             : // return false). The iterator can be positioned via a call to SeekGE, SeekLT,
    1544             : // First or Last. The iterator provides a point-in-time view of the current DB
    1545             : // state. This view is maintained by preventing file deletions and preventing
    1546             : // memtables referenced by the iterator from being deleted. Using an iterator
    1547             : // to maintain a long-lived point-in-time view of the DB state can lead to an
    1548             : // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
    1549             : // point-in-time snapshots which avoids these problems.
    1550           1 : func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
    1551           1 :         return d.NewIterWithContext(context.Background(), o)
    1552           1 : }
    1553             : 
    1554             : // NewIterWithContext is like NewIter, and additionally accepts a context for
    1555             : // tracing.
    1556           1 : func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
    1557           1 :         return d.newIter(ctx, nil /* batch */, newIterOpts{}, o), nil
    1558           1 : }
    1559             : 
    1560             : // NewSnapshot returns a point-in-time view of the current DB state. Iterators
    1561             : // created with this handle will all observe a stable snapshot of the current
    1562             : // DB state. The caller must call Snapshot.Close() when the snapshot is no
    1563             : // longer needed. Snapshots are not persisted across DB restarts (close ->
    1564             : // open). Unlike the implicit snapshot maintained by an iterator, a snapshot
    1565             : // will not prevent memtables from being released or sstables from being
    1566             : // deleted. Instead, a snapshot prevents deletion of sequence numbers
    1567             : // referenced by the snapshot.
    1568           1 : func (d *DB) NewSnapshot() *Snapshot {
    1569           1 :         if err := d.closed.Load(); err != nil {
    1570           0 :                 panic(err)
    1571             :         }
    1572             : 
    1573           1 :         d.mu.Lock()
    1574           1 :         s := &Snapshot{
    1575           1 :                 db:     d,
    1576           1 :                 seqNum: d.mu.versions.visibleSeqNum.Load(),
    1577           1 :         }
    1578           1 :         d.mu.snapshots.pushBack(s)
    1579           1 :         d.mu.Unlock()
    1580           1 :         return s
    1581             : }
    1582             : 
    1583             : // NewEventuallyFileOnlySnapshot returns a point-in-time view of the current DB
    1584             : // state, similar to NewSnapshot, but with consistency constrained to the
    1585             : // provided set of key ranges. See the comment at EventuallyFileOnlySnapshot for
    1586             : // its semantics.
    1587           1 : func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
    1588           1 :         if err := d.closed.Load(); err != nil {
    1589           0 :                 panic(err)
    1590             :         }
    1591             : 
    1592           1 :         internalKeyRanges := make([]internalKeyRange, len(keyRanges))
    1593           1 :         for i := range keyRanges {
    1594           1 :                 if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
    1595           0 :                         panic("pebble: key ranges for eventually-file-only-snapshot not in order")
    1596             :                 }
    1597           1 :                 internalKeyRanges[i] = internalKeyRange{
    1598           1 :                         smallest: base.MakeInternalKey(keyRanges[i].Start, InternalKeySeqNumMax, InternalKeyKindMax),
    1599           1 :                         largest:  base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, keyRanges[i].End),
    1600           1 :                 }
    1601             :         }
    1602             : 
    1603           1 :         return d.makeEventuallyFileOnlySnapshot(keyRanges, internalKeyRanges)
    1604             : }
    1605             : 
    1606             : // Close closes the DB.
    1607             : //
    1608             : // It is not safe to close a DB until all outstanding iterators are closed
    1609             : // or to call Close concurrently with any other DB method. It is not valid
    1610             : // to call any of a DB's methods after the DB has been closed.
    1611           1 : func (d *DB) Close() error {
    1612           1 :         // Lock the commit pipeline for the duration of Close. This prevents a race
    1613           1 :         // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
    1614           1 :         // dropping d.mu several times for I/O. If Close only holds d.mu, an
    1615           1 :         // in-progress WAL rotation may re-acquire d.mu only once the database is
    1616           1 :         // closed.
    1617           1 :         //
    1618           1 :         // Additionally, locking the commit pipeline makes it more likely that
    1619           1 :         // (illegal) concurrent writes will observe d.closed.Load() != nil, creating
    1620           1 :         // more understable panics if the database is improperly used concurrently
    1621           1 :         // during Close.
    1622           1 :         d.commit.mu.Lock()
    1623           1 :         defer d.commit.mu.Unlock()
    1624           1 :         d.mu.Lock()
    1625           1 :         defer d.mu.Unlock()
    1626           1 :         if err := d.closed.Load(); err != nil {
    1627           0 :                 panic(err)
    1628             :         }
    1629             : 
    1630             :         // Clear the finalizer that is used to check that an unreferenced DB has been
    1631             :         // closed. We're closing the DB here, so the check performed by that
    1632             :         // finalizer isn't necessary.
    1633             :         //
    1634             :         // Note: this is a no-op if invariants are disabled or race is enabled.
    1635           1 :         invariants.SetFinalizer(d.closed, nil)
    1636           1 : 
    1637           1 :         d.closed.Store(errors.WithStack(ErrClosed))
    1638           1 :         close(d.closedCh)
    1639           1 : 
    1640           1 :         defer d.opts.Cache.Unref()
    1641           1 : 
    1642           1 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.flushing {
    1643           1 :                 d.mu.compact.cond.Wait()
    1644           1 :         }
    1645           1 :         for d.mu.tableStats.loading {
    1646           1 :                 d.mu.tableStats.cond.Wait()
    1647           1 :         }
    1648           1 :         for d.mu.tableValidation.validating {
    1649           1 :                 d.mu.tableValidation.cond.Wait()
    1650           1 :         }
    1651             : 
    1652           1 :         var err error
    1653           1 :         if n := len(d.mu.compact.inProgress); n > 0 {
    1654           0 :                 err = errors.Errorf("pebble: %d unexpected in-progress compactions", errors.Safe(n))
    1655           0 :         }
    1656           1 :         err = firstError(err, d.mu.formatVers.marker.Close())
    1657           1 :         err = firstError(err, d.tableCache.close())
    1658           1 :         if !d.opts.ReadOnly {
    1659           1 :                 err = firstError(err, d.mu.log.Close())
    1660           1 :         } else if d.mu.log.LogWriter != nil {
    1661           0 :                 panic("pebble: log-writer should be nil in read-only mode")
    1662             :         }
    1663           1 :         err = firstError(err, d.fileLock.Close())
    1664           1 : 
    1665           1 :         // Note that versionSet.close() only closes the MANIFEST. The versions list
    1666           1 :         // is still valid for the checks below.
    1667           1 :         err = firstError(err, d.mu.versions.close())
    1668           1 : 
    1669           1 :         err = firstError(err, d.dataDir.Close())
    1670           1 :         if d.dataDir != d.walDir {
    1671           1 :                 err = firstError(err, d.walDir.Close())
    1672           1 :         }
    1673             : 
    1674           1 :         d.readState.val.unrefLocked()
    1675           1 : 
    1676           1 :         current := d.mu.versions.currentVersion()
    1677           1 :         for v := d.mu.versions.versions.Front(); true; v = v.Next() {
    1678           1 :                 refs := v.Refs()
    1679           1 :                 if v == current {
    1680           1 :                         if refs != 1 {
    1681           0 :                                 err = firstError(err, errors.Errorf("leaked iterators: current\n%s", v))
    1682           0 :                         }
    1683           1 :                         break
    1684             :                 }
    1685           0 :                 if refs != 0 {
    1686           0 :                         err = firstError(err, errors.Errorf("leaked iterators:\n%s", v))
    1687           0 :                 }
    1688             :         }
    1689             : 
    1690           1 :         for _, mem := range d.mu.mem.queue {
    1691           1 :                 // Usually, we'd want to delete the files returned by readerUnref. But
    1692           1 :                 // in this case, even if we're unreferencing the flushables, the
    1693           1 :                 // flushables aren't obsolete. They will be reconstructed during WAL
    1694           1 :                 // replay.
    1695           1 :                 mem.readerUnrefLocked(false)
    1696           1 :         }
    1697             :         // If there's an unused, recycled memtable, we need to release its memory.
    1698           1 :         if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
    1699           1 :                 d.freeMemTable(obsoleteMemTable)
    1700           1 :         }
    1701           1 :         if reserved := d.memTableReserved.Load(); reserved != 0 {
    1702           0 :                 err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved)))
    1703           0 :         }
    1704             : 
    1705             :         // Since we called d.readState.val.unrefLocked() above, we are expected to
    1706             :         // manually schedule deletion of obsolete files.
    1707           1 :         if len(d.mu.versions.obsoleteTables) > 0 {
    1708           1 :                 d.deleteObsoleteFiles(d.mu.nextJobID)
    1709           1 :         }
    1710             : 
    1711           1 :         d.mu.Unlock()
    1712           1 :         d.compactionSchedulers.Wait()
    1713           1 : 
    1714           1 :         // Wait for all cleaning jobs to finish.
    1715           1 :         d.cleanupManager.Close()
    1716           1 : 
    1717           1 :         // Sanity check metrics.
    1718           1 :         if invariants.Enabled {
    1719           1 :                 m := d.Metrics()
    1720           1 :                 if m.Compact.NumInProgress > 0 || m.Compact.InProgressBytes > 0 {
    1721           0 :                         d.mu.Lock()
    1722           0 :                         panic(fmt.Sprintf("invalid metrics on close:\n%s", m))
    1723             :                 }
    1724             :         }
    1725             : 
    1726           1 :         d.mu.Lock()
    1727           1 : 
    1728           1 :         // As a sanity check, ensure that there are no zombie tables. A non-zero count
    1729           1 :         // hints at a reference count leak.
    1730           1 :         if ztbls := len(d.mu.versions.zombieTables); ztbls > 0 {
    1731           0 :                 err = firstError(err, errors.Errorf("non-zero zombie file count: %d", ztbls))
    1732           0 :         }
    1733             : 
    1734           1 :         err = firstError(err, d.objProvider.Close())
    1735           1 : 
    1736           1 :         // If the options include a closer to 'close' the filesystem, close it.
    1737           1 :         if d.opts.private.fsCloser != nil {
    1738           1 :                 d.opts.private.fsCloser.Close()
    1739           1 :         }
    1740             : 
    1741             :         // Return an error if the user failed to close all open snapshots.
    1742           1 :         if v := d.mu.snapshots.count(); v > 0 {
    1743           0 :                 err = firstError(err, errors.Errorf("leaked snapshots: %d open snapshots on DB %p", v, d))
    1744           0 :         }
    1745             : 
    1746           1 :         return err
    1747             : }
    1748             : 
    1749             : // Compact the specified range of keys in the database.
    1750           1 : func (d *DB) Compact(start, end []byte, parallelize bool) error {
    1751           1 :         if err := d.closed.Load(); err != nil {
    1752           0 :                 panic(err)
    1753             :         }
    1754           1 :         if d.opts.ReadOnly {
    1755           0 :                 return ErrReadOnly
    1756           0 :         }
    1757           1 :         if d.cmp(start, end) >= 0 {
    1758           1 :                 return errors.Errorf("Compact start %s is not less than end %s",
    1759           1 :                         d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
    1760           1 :         }
    1761           1 :         iStart := base.MakeInternalKey(start, InternalKeySeqNumMax, InternalKeyKindMax)
    1762           1 :         iEnd := base.MakeInternalKey(end, 0, 0)
    1763           1 :         m := (&fileMetadata{}).ExtendPointKeyBounds(d.cmp, iStart, iEnd)
    1764           1 :         meta := []*fileMetadata{m}
    1765           1 : 
    1766           1 :         d.mu.Lock()
    1767           1 :         maxLevelWithFiles := 1
    1768           1 :         cur := d.mu.versions.currentVersion()
    1769           1 :         for level := 0; level < numLevels; level++ {
    1770           1 :                 overlaps := cur.Overlaps(level, d.cmp, start, end, iEnd.IsExclusiveSentinel())
    1771           1 :                 if !overlaps.Empty() {
    1772           1 :                         maxLevelWithFiles = level + 1
    1773           1 :                 }
    1774             :         }
    1775             : 
    1776           1 :         keyRanges := make([]internalKeyRange, len(meta))
    1777           1 :         for i := range meta {
    1778           1 :                 keyRanges[i] = internalKeyRange{smallest: m.Smallest, largest: m.Largest}
    1779           1 :         }
    1780             :         // Determine if any memtable overlaps with the compaction range. We wait for
    1781             :         // any such overlap to flush (initiating a flush if necessary).
    1782           1 :         mem, err := func() (*flushableEntry, error) {
    1783           1 :                 // Check to see if any files overlap with any of the memtables. The queue
    1784           1 :                 // is ordered from oldest to newest with the mutable memtable being the
    1785           1 :                 // last element in the slice. We want to wait for the newest table that
    1786           1 :                 // overlaps.
    1787           1 :                 for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
    1788           1 :                         mem := d.mu.mem.queue[i]
    1789           1 :                         if ingestMemtableOverlaps(d.cmp, mem, keyRanges) {
    1790           1 :                                 var err error
    1791           1 :                                 if mem.flushable == d.mu.mem.mutable {
    1792           1 :                                         // We have to hold both commitPipeline.mu and DB.mu when calling
    1793           1 :                                         // makeRoomForWrite(). Lock order requirements elsewhere force us to
    1794           1 :                                         // unlock DB.mu in order to grab commitPipeline.mu first.
    1795           1 :                                         d.mu.Unlock()
    1796           1 :                                         d.commit.mu.Lock()
    1797           1 :                                         d.mu.Lock()
    1798           1 :                                         defer d.commit.mu.Unlock()
    1799           1 :                                         if mem.flushable == d.mu.mem.mutable {
    1800           1 :                                                 // Only flush if the active memtable is unchanged.
    1801           1 :                                                 err = d.makeRoomForWrite(nil)
    1802           1 :                                         }
    1803             :                                 }
    1804           1 :                                 mem.flushForced = true
    1805           1 :                                 d.maybeScheduleFlush()
    1806           1 :                                 return mem, err
    1807             :                         }
    1808             :                 }
    1809           1 :                 return nil, nil
    1810             :         }()
    1811             : 
    1812           1 :         d.mu.Unlock()
    1813           1 : 
    1814           1 :         if err != nil {
    1815           0 :                 return err
    1816           0 :         }
    1817           1 :         if mem != nil {
    1818           1 :                 <-mem.flushed
    1819           1 :         }
    1820             : 
    1821           1 :         for level := 0; level < maxLevelWithFiles; {
    1822           1 :                 for {
    1823           1 :                         if err := d.manualCompact(
    1824           1 :                                 iStart.UserKey, iEnd.UserKey, level, parallelize); err != nil {
    1825           0 :                                 if errors.Is(err, ErrCancelledCompaction) {
    1826           0 :                                         continue
    1827             :                                 }
    1828           0 :                                 return err
    1829             :                         }
    1830           1 :                         break
    1831             :                 }
    1832           1 :                 level++
    1833           1 :                 if level == numLevels-1 {
    1834           1 :                         // A manual compaction of the bottommost level occurred.
    1835           1 :                         // There is no next level to try and compact.
    1836           1 :                         break
    1837             :                 }
    1838             :         }
    1839           1 :         return nil
    1840             : }
    1841             : 
    1842           1 : func (d *DB) manualCompact(start, end []byte, level int, parallelize bool) error {
    1843           1 :         d.mu.Lock()
    1844           1 :         curr := d.mu.versions.currentVersion()
    1845           1 :         files := curr.Overlaps(level, d.cmp, start, end, false)
    1846           1 :         if files.Empty() {
    1847           1 :                 d.mu.Unlock()
    1848           1 :                 return nil
    1849           1 :         }
    1850             : 
    1851           1 :         var compactions []*manualCompaction
    1852           1 :         if parallelize {
    1853           1 :                 compactions = append(compactions, d.splitManualCompaction(start, end, level)...)
    1854           1 :         } else {
    1855           1 :                 compactions = append(compactions, &manualCompaction{
    1856           1 :                         level: level,
    1857           1 :                         done:  make(chan error, 1),
    1858           1 :                         start: start,
    1859           1 :                         end:   end,
    1860           1 :                 })
    1861           1 :         }
    1862           1 :         d.mu.compact.manual = append(d.mu.compact.manual, compactions...)
    1863           1 :         d.maybeScheduleCompaction()
    1864           1 :         d.mu.Unlock()
    1865           1 : 
    1866           1 :         // Each of the channels is guaranteed to be eventually sent to once. After a
    1867           1 :         // compaction is possibly picked in d.maybeScheduleCompaction(), either the
    1868           1 :         // compaction is dropped, executed after being scheduled, or retried later.
    1869           1 :         // Assuming eventual progress when a compaction is retried, all outcomes send
    1870           1 :         // a value to the done channel. Since the channels are buffered, it is not
    1871           1 :         // necessary to read from each channel, and so we can exit early in the event
    1872           1 :         // of an error.
    1873           1 :         for _, compaction := range compactions {
    1874           1 :                 if err := <-compaction.done; err != nil {
    1875           0 :                         return err
    1876           0 :                 }
    1877             :         }
    1878           1 :         return nil
    1879             : }
    1880             : 
    1881             : // splitManualCompaction splits a manual compaction over [start,end] on level
    1882             : // such that the resulting compactions have no key overlap.
    1883             : func (d *DB) splitManualCompaction(
    1884             :         start, end []byte, level int,
    1885           1 : ) (splitCompactions []*manualCompaction) {
    1886           1 :         curr := d.mu.versions.currentVersion()
    1887           1 :         endLevel := level + 1
    1888           1 :         baseLevel := d.mu.versions.picker.getBaseLevel()
    1889           1 :         if level == 0 {
    1890           1 :                 endLevel = baseLevel
    1891           1 :         }
    1892           1 :         keyRanges := calculateInuseKeyRanges(curr, d.cmp, level, endLevel, start, end)
    1893           1 :         for _, keyRange := range keyRanges {
    1894           1 :                 splitCompactions = append(splitCompactions, &manualCompaction{
    1895           1 :                         level: level,
    1896           1 :                         done:  make(chan error, 1),
    1897           1 :                         start: keyRange.Start,
    1898           1 :                         end:   keyRange.End,
    1899           1 :                         split: true,
    1900           1 :                 })
    1901           1 :         }
    1902           1 :         return splitCompactions
    1903             : }
    1904             : 
    1905             : // DownloadSpan is a key range passed to the Download method.
    1906             : type DownloadSpan struct {
    1907             :         StartKey []byte
    1908             :         // EndKey is exclusive.
    1909             :         EndKey []byte
    1910             : }
    1911             : 
    1912           0 : func (d *DB) downloadSpan(ctx context.Context, span DownloadSpan) error {
    1913           0 :         dSpan := &downloadSpan{
    1914           0 :                 start: span.StartKey,
    1915           0 :                 end:   span.EndKey,
    1916           0 :                 // Protected by d.mu.
    1917           0 :                 doneChans: make([]chan error, 1),
    1918           0 :         }
    1919           0 :         dSpan.doneChans[0] = make(chan error, 1)
    1920           0 :         doneChan := dSpan.doneChans[0]
    1921           0 :         compactionIdx := 0
    1922           0 : 
    1923           0 :         func() {
    1924           0 :                 d.mu.Lock()
    1925           0 :                 defer d.mu.Unlock()
    1926           0 : 
    1927           0 :                 d.mu.compact.downloads = append(d.mu.compact.downloads, dSpan)
    1928           0 :                 d.maybeScheduleCompaction()
    1929           0 :         }()
    1930             : 
    1931             :         // Requires d.mu to be held.
    1932           0 :         noExternalFilesInSpan := func() (noExternalFiles bool) {
    1933           0 :                 vers := d.mu.versions.currentVersion()
    1934           0 : 
    1935           0 :                 for i := 0; i < len(vers.Levels); i++ {
    1936           0 :                         if vers.Levels[i].Empty() {
    1937           0 :                                 continue
    1938             :                         }
    1939           0 :                         overlap := vers.Overlaps(i, d.cmp, span.StartKey, span.EndKey, true /* exclusiveEnd */)
    1940           0 :                         foundExternalFile := false
    1941           0 :                         overlap.Each(func(metadata *manifest.FileMetadata) {
    1942           0 :                                 objMeta, err := d.objProvider.Lookup(fileTypeTable, metadata.FileBacking.DiskFileNum)
    1943           0 :                                 if err != nil {
    1944           0 :                                         return
    1945           0 :                                 }
    1946           0 :                                 if objMeta.IsExternal() {
    1947           0 :                                         foundExternalFile = true
    1948           0 :                                 }
    1949             :                         })
    1950           0 :                         if foundExternalFile {
    1951           0 :                                 return false
    1952           0 :                         }
    1953             :                 }
    1954           0 :                 return true
    1955             :         }
    1956             : 
    1957             :         // Requires d.mu to be held.
    1958           0 :         removeUsFromList := func() {
    1959           0 :                 // Check where we are in d.mu.compact.downloads. Remove us from the
    1960           0 :                 // list.
    1961           0 :                 for i := range d.mu.compact.downloads {
    1962           0 :                         if d.mu.compact.downloads[i] != dSpan {
    1963           0 :                                 continue
    1964             :                         }
    1965           0 :                         copy(d.mu.compact.downloads[i:], d.mu.compact.downloads[i+1:])
    1966           0 :                         d.mu.compact.downloads = d.mu.compact.downloads[:len(d.mu.compact.downloads)-1]
    1967           0 :                         break
    1968             :                 }
    1969             :         }
    1970             : 
    1971           0 :         for {
    1972           0 :                 select {
    1973           0 :                 case <-ctx.Done():
    1974           0 :                         d.mu.Lock()
    1975           0 :                         defer d.mu.Unlock()
    1976           0 :                         removeUsFromList()
    1977           0 :                         return ctx.Err()
    1978           0 :                 case err := <-doneChan:
    1979           0 :                         if err != nil {
    1980           0 :                                 d.mu.Lock()
    1981           0 :                                 defer d.mu.Unlock()
    1982           0 :                                 removeUsFromList()
    1983           0 :                                 return err
    1984           0 :                         }
    1985           0 :                         compactionIdx++
    1986           0 :                         // Grab the next doneCh to wait on.
    1987           0 :                         func() {
    1988           0 :                                 d.mu.Lock()
    1989           0 :                                 defer d.mu.Unlock()
    1990           0 :                                 doneChan = dSpan.doneChans[compactionIdx]
    1991           0 :                         }()
    1992           0 :                 default:
    1993           0 :                         doneSpan := func() bool {
    1994           0 :                                 d.mu.Lock()
    1995           0 :                                 defer d.mu.Unlock()
    1996           0 :                                 // It's possible to have downloaded all files without writing to any
    1997           0 :                                 // doneChans. This is expected if there are a significant amount
    1998           0 :                                 // of overlapping writes that schedule regular, non-download compactions.
    1999           0 :                                 if noExternalFilesInSpan() {
    2000           0 :                                         removeUsFromList()
    2001           0 :                                         return true
    2002           0 :                                 }
    2003           0 :                                 d.maybeScheduleCompaction()
    2004           0 :                                 d.mu.compact.cond.Wait()
    2005           0 :                                 return false
    2006             :                         }()
    2007           0 :                         if doneSpan {
    2008           0 :                                 return nil
    2009           0 :                         }
    2010             :                 }
    2011             :         }
    2012             : }
    2013             : 
    2014             : // Download ensures that the LSM does not use any external sstables for the
    2015             : // given key ranges. It does so by performing appropriate compactions so that
    2016             : // all external data becomes available locally.
    2017             : //
    2018             : // Note that calling this method does not imply that all other compactions stop;
    2019             : // it simply informs Pebble of a list of spans for which external data should be
    2020             : // downloaded with high priority.
    2021             : //
    2022             : // The method returns once no external sstasbles overlap the given spans, the
    2023             : // context is canceled, or an error is hit.
    2024             : //
    2025             : // TODO(radu): consider passing a priority/impact knob to express how important
    2026             : // the download is (versus live traffic performance, LSM health).
    2027           0 : func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
    2028           0 :         ctx, cancel := context.WithCancel(ctx)
    2029           0 :         defer cancel()
    2030           0 :         if err := d.closed.Load(); err != nil {
    2031           0 :                 panic(err)
    2032             :         }
    2033           0 :         if d.opts.ReadOnly {
    2034           0 :                 return ErrReadOnly
    2035           0 :         }
    2036           0 :         for i := range spans {
    2037           0 :                 if err := ctx.Err(); err != nil {
    2038           0 :                         return err
    2039           0 :                 }
    2040           0 :                 if err := d.downloadSpan(ctx, spans[i]); err != nil {
    2041           0 :                         return err
    2042           0 :                 }
    2043             :         }
    2044           0 :         return nil
    2045             : }
    2046             : 
    2047             : // Flush the memtable to stable storage.
    2048           1 : func (d *DB) Flush() error {
    2049           1 :         flushDone, err := d.AsyncFlush()
    2050           1 :         if err != nil {
    2051           0 :                 return err
    2052           0 :         }
    2053           1 :         <-flushDone
    2054           1 :         return nil
    2055             : }
    2056             : 
    2057             : // AsyncFlush asynchronously flushes the memtable to stable storage.
    2058             : //
    2059             : // If no error is returned, the caller can receive from the returned channel in
    2060             : // order to wait for the flush to complete.
    2061           1 : func (d *DB) AsyncFlush() (<-chan struct{}, error) {
    2062           1 :         if err := d.closed.Load(); err != nil {
    2063           0 :                 panic(err)
    2064             :         }
    2065           1 :         if d.opts.ReadOnly {
    2066           0 :                 return nil, ErrReadOnly
    2067           0 :         }
    2068             : 
    2069           1 :         d.commit.mu.Lock()
    2070           1 :         defer d.commit.mu.Unlock()
    2071           1 :         d.mu.Lock()
    2072           1 :         defer d.mu.Unlock()
    2073           1 :         flushed := d.mu.mem.queue[len(d.mu.mem.queue)-1].flushed
    2074           1 :         err := d.makeRoomForWrite(nil)
    2075           1 :         if err != nil {
    2076           0 :                 return nil, err
    2077           0 :         }
    2078           1 :         return flushed, nil
    2079             : }
    2080             : 
    2081             : // Metrics returns metrics about the database.
    2082           1 : func (d *DB) Metrics() *Metrics {
    2083           1 :         metrics := &Metrics{}
    2084           1 :         recycledLogsCount, recycledLogSize := d.logRecycler.stats()
    2085           1 : 
    2086           1 :         d.mu.Lock()
    2087           1 :         vers := d.mu.versions.currentVersion()
    2088           1 :         *metrics = d.mu.versions.metrics
    2089           1 :         metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt(0)
    2090           1 :         metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
    2091           1 :         metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount)
    2092           1 :         metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
    2093           1 :         metrics.Compact.Duration = d.mu.compact.duration
    2094           1 :         for c := range d.mu.compact.inProgress {
    2095           0 :                 if c.kind != compactionKindFlush {
    2096           0 :                         metrics.Compact.Duration += d.timeNow().Sub(c.beganAt)
    2097           0 :                 }
    2098             :         }
    2099             : 
    2100           1 :         for _, m := range d.mu.mem.queue {
    2101           1 :                 metrics.MemTable.Size += m.totalBytes()
    2102           1 :         }
    2103           1 :         metrics.Snapshots.Count = d.mu.snapshots.count()
    2104           1 :         if metrics.Snapshots.Count > 0 {
    2105           0 :                 metrics.Snapshots.EarliestSeqNum = d.mu.snapshots.earliest()
    2106           0 :         }
    2107           1 :         metrics.Snapshots.PinnedKeys = d.mu.snapshots.cumulativePinnedCount
    2108           1 :         metrics.Snapshots.PinnedSize = d.mu.snapshots.cumulativePinnedSize
    2109           1 :         metrics.MemTable.Count = int64(len(d.mu.mem.queue))
    2110           1 :         metrics.MemTable.ZombieCount = d.memTableCount.Load() - metrics.MemTable.Count
    2111           1 :         metrics.MemTable.ZombieSize = uint64(d.memTableReserved.Load()) - metrics.MemTable.Size
    2112           1 :         metrics.WAL.ObsoleteFiles = int64(recycledLogsCount)
    2113           1 :         metrics.WAL.ObsoletePhysicalSize = recycledLogSize
    2114           1 :         metrics.WAL.Size = d.logSize.Load()
    2115           1 :         // The current WAL size (d.atomic.logSize) is the current logical size,
    2116           1 :         // which may be less than the WAL's physical size if it was recycled.
    2117           1 :         // The file sizes in d.mu.log.queue are updated to the physical size
    2118           1 :         // during WAL rotation. Use the larger of the two for the current WAL. All
    2119           1 :         // the previous WALs's fileSizes in d.mu.log.queue are already updated.
    2120           1 :         metrics.WAL.PhysicalSize = metrics.WAL.Size
    2121           1 :         if len(d.mu.log.queue) > 0 && metrics.WAL.PhysicalSize < d.mu.log.queue[len(d.mu.log.queue)-1].fileSize {
    2122           0 :                 metrics.WAL.PhysicalSize = d.mu.log.queue[len(d.mu.log.queue)-1].fileSize
    2123           0 :         }
    2124           1 :         for i, n := 0, len(d.mu.log.queue)-1; i < n; i++ {
    2125           1 :                 metrics.WAL.PhysicalSize += d.mu.log.queue[i].fileSize
    2126           1 :         }
    2127             : 
    2128           1 :         metrics.WAL.BytesIn = d.mu.log.bytesIn // protected by d.mu
    2129           1 :         for i, n := 0, len(d.mu.mem.queue)-1; i < n; i++ {
    2130           1 :                 metrics.WAL.Size += d.mu.mem.queue[i].logSize
    2131           1 :         }
    2132           1 :         metrics.WAL.BytesWritten = metrics.Levels[0].BytesIn + metrics.WAL.Size
    2133           1 :         if p := d.mu.versions.picker; p != nil {
    2134           1 :                 compactions := d.getInProgressCompactionInfoLocked(nil)
    2135           1 :                 for level, score := range p.getScores(compactions) {
    2136           1 :                         metrics.Levels[level].Score = score
    2137           1 :                 }
    2138             :         }
    2139           1 :         metrics.Table.ZombieCount = int64(len(d.mu.versions.zombieTables))
    2140           1 :         for _, size := range d.mu.versions.zombieTables {
    2141           0 :                 metrics.Table.ZombieSize += size
    2142           0 :         }
    2143           1 :         metrics.private.optionsFileSize = d.optionsFileSize
    2144           1 : 
    2145           1 :         // TODO(jackson): Consider making these metrics optional.
    2146           1 :         metrics.Keys.RangeKeySetsCount = countRangeKeySetFragments(vers)
    2147           1 :         metrics.Keys.TombstoneCount = countTombstones(vers)
    2148           1 : 
    2149           1 :         d.mu.versions.logLock()
    2150           1 :         metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
    2151           1 :         metrics.Table.BackingTableCount = uint64(len(d.mu.versions.backingState.fileBackingMap))
    2152           1 :         metrics.Table.BackingTableSize = d.mu.versions.backingState.fileBackingSize
    2153           1 :         if invariants.Enabled {
    2154           1 :                 var totalSize uint64
    2155           1 :                 for _, backing := range d.mu.versions.backingState.fileBackingMap {
    2156           1 :                         totalSize += backing.Size
    2157           1 :                 }
    2158           1 :                 if totalSize != metrics.Table.BackingTableSize {
    2159           0 :                         panic("pebble: invalid backing table size accounting")
    2160             :                 }
    2161             :         }
    2162           1 :         d.mu.versions.logUnlock()
    2163           1 : 
    2164           1 :         metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
    2165           1 :         if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
    2166           0 :                 d.opts.Logger.Errorf("metrics error: %s", err)
    2167           0 :         }
    2168           1 :         metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
    2169           1 :         if d.mu.compact.flushing {
    2170           0 :                 metrics.Flush.NumInProgress = 1
    2171           0 :         }
    2172           1 :         for i := 0; i < numLevels; i++ {
    2173           1 :                 metrics.Levels[i].Additional.ValueBlocksSize = valueBlocksSizeForLevel(vers, i)
    2174           1 :         }
    2175             : 
    2176           1 :         d.mu.Unlock()
    2177           1 : 
    2178           1 :         metrics.BlockCache = d.opts.Cache.Metrics()
    2179           1 :         metrics.TableCache, metrics.Filter = d.tableCache.metrics()
    2180           1 :         metrics.TableIters = int64(d.tableCache.iterCount())
    2181           1 :         metrics.CategoryStats = d.tableCache.dbOpts.sstStatsCollector.GetStats()
    2182           1 : 
    2183           1 :         metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
    2184           1 : 
    2185           1 :         metrics.Uptime = d.timeNow().Sub(d.openedAt)
    2186           1 : 
    2187           1 :         return metrics
    2188             : }
    2189             : 
    2190             : // sstablesOptions hold the optional parameters to retrieve TableInfo for all sstables.
    2191             : type sstablesOptions struct {
    2192             :         // set to true will return the sstable properties in TableInfo
    2193             :         withProperties bool
    2194             : 
    2195             :         // if set, return sstables that overlap the key range (end-exclusive)
    2196             :         start []byte
    2197             :         end   []byte
    2198             : 
    2199             :         withApproximateSpanBytes bool
    2200             : }
    2201             : 
    2202             : // SSTablesOption set optional parameter used by `DB.SSTables`.
    2203             : type SSTablesOption func(*sstablesOptions)
    2204             : 
    2205             : // WithProperties enable return sstable properties in each TableInfo.
    2206             : //
    2207             : // NOTE: if most of the sstable properties need to be read from disk,
    2208             : // this options may make method `SSTables` quite slow.
    2209           0 : func WithProperties() SSTablesOption {
    2210           0 :         return func(opt *sstablesOptions) {
    2211           0 :                 opt.withProperties = true
    2212           0 :         }
    2213             : }
    2214             : 
    2215             : // WithKeyRangeFilter ensures returned sstables overlap start and end (end-exclusive)
    2216             : // if start and end are both nil these properties have no effect.
    2217           0 : func WithKeyRangeFilter(start, end []byte) SSTablesOption {
    2218           0 :         return func(opt *sstablesOptions) {
    2219           0 :                 opt.end = end
    2220           0 :                 opt.start = start
    2221           0 :         }
    2222             : }
    2223             : 
    2224             : // WithApproximateSpanBytes enables capturing the approximate number of bytes that
    2225             : // overlap the provided key span for each sstable.
    2226             : // NOTE: this option can only be used with WithKeyRangeFilter and WithProperties
    2227             : // provided.
    2228           0 : func WithApproximateSpanBytes() SSTablesOption {
    2229           0 :         return func(opt *sstablesOptions) {
    2230           0 :                 opt.withApproximateSpanBytes = true
    2231           0 :         }
    2232             : }
    2233             : 
    2234             : // BackingType denotes the type of storage backing a given sstable.
    2235             : type BackingType int
    2236             : 
    2237             : const (
    2238             :         // BackingTypeLocal denotes an sstable stored on local disk according to the
    2239             :         // objprovider. This file is completely owned by us.
    2240             :         BackingTypeLocal BackingType = iota
    2241             :         // BackingTypeShared denotes an sstable stored on shared storage, created
    2242             :         // by this Pebble instance and possibly shared by other Pebble instances.
    2243             :         // These types of files have lifecycle managed by Pebble.
    2244             :         BackingTypeShared
    2245             :         // BackingTypeSharedForeign denotes an sstable stored on shared storage,
    2246             :         // created by a Pebble instance other than this one. These types of files have
    2247             :         // lifecycle managed by Pebble.
    2248             :         BackingTypeSharedForeign
    2249             :         // BackingTypeExternal denotes an sstable stored on external storage,
    2250             :         // not owned by any Pebble instance and with no refcounting/cleanup methods
    2251             :         // or lifecycle management. An example of an external file is a file restored
    2252             :         // from a backup.
    2253             :         BackingTypeExternal
    2254             : )
    2255             : 
    2256             : // SSTableInfo export manifest.TableInfo with sstable.Properties alongside
    2257             : // other file backing info.
    2258             : type SSTableInfo struct {
    2259             :         manifest.TableInfo
    2260             :         // Virtual indicates whether the sstable is virtual.
    2261             :         Virtual bool
    2262             :         // BackingSSTNum is the file number associated with backing sstable which
    2263             :         // backs the sstable associated with this SSTableInfo. If Virtual is false,
    2264             :         // then BackingSSTNum == FileNum.
    2265             :         BackingSSTNum base.FileNum
    2266             :         // BackingType is the type of storage backing this sstable.
    2267             :         BackingType BackingType
    2268             :         // Locator is the remote.Locator backing this sstable, if the backing type is
    2269             :         // not BackingTypeLocal.
    2270             :         Locator remote.Locator
    2271             : 
    2272             :         // Properties is the sstable properties of this table. If Virtual is true,
    2273             :         // then the Properties are associated with the backing sst.
    2274             :         Properties *sstable.Properties
    2275             : }
    2276             : 
    2277             : // SSTables retrieves the current sstables. The returned slice is indexed by
    2278             : // level and each level is indexed by the position of the sstable within the
    2279             : // level. Note that this information may be out of date due to concurrent
    2280             : // flushes and compactions.
    2281           0 : func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
    2282           0 :         opt := &sstablesOptions{}
    2283           0 :         for _, fn := range opts {
    2284           0 :                 fn(opt)
    2285           0 :         }
    2286             : 
    2287           0 :         if opt.withApproximateSpanBytes && !opt.withProperties {
    2288           0 :                 return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithProperties option.")
    2289           0 :         }
    2290           0 :         if opt.withApproximateSpanBytes && (opt.start == nil || opt.end == nil) {
    2291           0 :                 return nil, errors.Errorf("Cannot use WithApproximateSpanBytes without WithKeyRangeFilter option.")
    2292           0 :         }
    2293             : 
    2294             :         // Grab and reference the current readState.
    2295           0 :         readState := d.loadReadState()
    2296           0 :         defer readState.unref()
    2297           0 : 
    2298           0 :         // TODO(peter): This is somewhat expensive, especially on a large
    2299           0 :         // database. It might be worthwhile to unify TableInfo and FileMetadata and
    2300           0 :         // then we could simply return current.Files. Note that RocksDB is doing
    2301           0 :         // something similar to the current code, so perhaps it isn't too bad.
    2302           0 :         srcLevels := readState.current.Levels
    2303           0 :         var totalTables int
    2304           0 :         for i := range srcLevels {
    2305           0 :                 totalTables += srcLevels[i].Len()
    2306           0 :         }
    2307             : 
    2308           0 :         destTables := make([]SSTableInfo, totalTables)
    2309           0 :         destLevels := make([][]SSTableInfo, len(srcLevels))
    2310           0 :         for i := range destLevels {
    2311           0 :                 iter := srcLevels[i].Iter()
    2312           0 :                 j := 0
    2313           0 :                 for m := iter.First(); m != nil; m = iter.Next() {
    2314           0 :                         if opt.start != nil && opt.end != nil && !m.Overlaps(d.opts.Comparer.Compare, opt.start, opt.end, true /* exclusive end */) {
    2315           0 :                                 continue
    2316             :                         }
    2317           0 :                         destTables[j] = SSTableInfo{TableInfo: m.TableInfo()}
    2318           0 :                         if opt.withProperties {
    2319           0 :                                 p, err := d.tableCache.getTableProperties(
    2320           0 :                                         m,
    2321           0 :                                 )
    2322           0 :                                 if err != nil {
    2323           0 :                                         return nil, err
    2324           0 :                                 }
    2325           0 :                                 destTables[j].Properties = p
    2326             :                         }
    2327           0 :                         destTables[j].Virtual = m.Virtual
    2328           0 :                         destTables[j].BackingSSTNum = m.FileBacking.DiskFileNum.FileNum()
    2329           0 :                         objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
    2330           0 :                         if err != nil {
    2331           0 :                                 return nil, err
    2332           0 :                         }
    2333           0 :                         if objMeta.IsRemote() {
    2334           0 :                                 if objMeta.IsShared() {
    2335           0 :                                         if d.objProvider.IsSharedForeign(objMeta) {
    2336           0 :                                                 destTables[j].BackingType = BackingTypeSharedForeign
    2337           0 :                                         } else {
    2338           0 :                                                 destTables[j].BackingType = BackingTypeShared
    2339           0 :                                         }
    2340           0 :                                 } else {
    2341           0 :                                         destTables[j].BackingType = BackingTypeExternal
    2342           0 :                                 }
    2343           0 :                                 destTables[j].Locator = objMeta.Remote.Locator
    2344           0 :                         } else {
    2345           0 :                                 destTables[j].BackingType = BackingTypeLocal
    2346           0 :                         }
    2347             : 
    2348           0 :                         if opt.withApproximateSpanBytes {
    2349           0 :                                 var spanBytes uint64
    2350           0 :                                 if m.ContainedWithinSpan(d.opts.Comparer.Compare, opt.start, opt.end) {
    2351           0 :                                         spanBytes = m.Size
    2352           0 :                                 } else {
    2353           0 :                                         size, err := d.tableCache.estimateSize(m, opt.start, opt.end)
    2354           0 :                                         if err != nil {
    2355           0 :                                                 return nil, err
    2356           0 :                                         }
    2357           0 :                                         spanBytes = size
    2358             :                                 }
    2359           0 :                                 propertiesCopy := *destTables[j].Properties
    2360           0 : 
    2361           0 :                                 // Deep copy user properties so approximate span bytes can be added.
    2362           0 :                                 propertiesCopy.UserProperties = make(map[string]string, len(destTables[j].Properties.UserProperties)+1)
    2363           0 :                                 for k, v := range destTables[j].Properties.UserProperties {
    2364           0 :                                         propertiesCopy.UserProperties[k] = v
    2365           0 :                                 }
    2366           0 :                                 propertiesCopy.UserProperties["approximate-span-bytes"] = strconv.FormatUint(spanBytes, 10)
    2367           0 :                                 destTables[j].Properties = &propertiesCopy
    2368             :                         }
    2369           0 :                         j++
    2370             :                 }
    2371           0 :                 destLevels[i] = destTables[:j]
    2372           0 :                 destTables = destTables[j:]
    2373             :         }
    2374             : 
    2375           0 :         return destLevels, nil
    2376             : }
    2377             : 
    2378             : // EstimateDiskUsage returns the estimated filesystem space used in bytes for
    2379             : // storing the range `[start, end]`. The estimation is computed as follows:
    2380             : //
    2381             : //   - For sstables fully contained in the range the whole file size is included.
    2382             : //   - For sstables partially contained in the range the overlapping data block sizes
    2383             : //     are included. Even if a data block partially overlaps, or we cannot determine
    2384             : //     overlap due to abbreviated index keys, the full data block size is included in
    2385             : //     the estimation. Note that unlike fully contained sstables, none of the
    2386             : //     meta-block space is counted for partially overlapped files.
    2387             : //   - For virtual sstables, we use the overlap between start, end and the virtual
    2388             : //     sstable bounds to determine disk usage.
    2389             : //   - There may also exist WAL entries for unflushed keys in this range. This
    2390             : //     estimation currently excludes space used for the range in the WAL.
    2391           0 : func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) {
    2392           0 :         bytes, _, _, err := d.EstimateDiskUsageByBackingType(start, end)
    2393           0 :         return bytes, err
    2394           0 : }
    2395             : 
    2396             : // EstimateDiskUsageByBackingType is like EstimateDiskUsage but additionally
    2397             : // returns the subsets of that size in remote ane external files.
    2398             : func (d *DB) EstimateDiskUsageByBackingType(
    2399             :         start, end []byte,
    2400           0 : ) (totalSize, remoteSize, externalSize uint64, _ error) {
    2401           0 :         if err := d.closed.Load(); err != nil {
    2402           0 :                 panic(err)
    2403             :         }
    2404           0 :         if d.opts.Comparer.Compare(start, end) > 0 {
    2405           0 :                 return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
    2406           0 :         }
    2407             : 
    2408             :         // Grab and reference the current readState. This prevents the underlying
    2409             :         // files in the associated version from being deleted if there is a concurrent
    2410             :         // compaction.
    2411           0 :         readState := d.loadReadState()
    2412           0 :         defer readState.unref()
    2413           0 : 
    2414           0 :         for level, files := range readState.current.Levels {
    2415           0 :                 iter := files.Iter()
    2416           0 :                 if level > 0 {
    2417           0 :                         // We can only use `Overlaps` to restrict `files` at L1+ since at L0 it
    2418           0 :                         // expands the range iteratively until it has found a set of files that
    2419           0 :                         // do not overlap any other L0 files outside that set.
    2420           0 :                         overlaps := readState.current.Overlaps(level, d.opts.Comparer.Compare, start, end, false /* exclusiveEnd */)
    2421           0 :                         iter = overlaps.Iter()
    2422           0 :                 }
    2423           0 :                 for file := iter.First(); file != nil; file = iter.Next() {
    2424           0 :                         if d.opts.Comparer.Compare(start, file.Smallest.UserKey) <= 0 &&
    2425           0 :                                 d.opts.Comparer.Compare(file.Largest.UserKey, end) <= 0 {
    2426           0 :                                 // The range fully contains the file, so skip looking it up in
    2427           0 :                                 // table cache/looking at its indexes, and add the full file size.
    2428           0 :                                 meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
    2429           0 :                                 if err != nil {
    2430           0 :                                         return 0, 0, 0, err
    2431           0 :                                 }
    2432           0 :                                 if meta.IsRemote() {
    2433           0 :                                         remoteSize += file.Size
    2434           0 :                                         if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
    2435           0 :                                                 externalSize += file.Size
    2436           0 :                                         }
    2437             :                                 }
    2438           0 :                                 totalSize += file.Size
    2439           0 :                         } else if d.opts.Comparer.Compare(file.Smallest.UserKey, end) <= 0 &&
    2440           0 :                                 d.opts.Comparer.Compare(start, file.Largest.UserKey) <= 0 {
    2441           0 :                                 var size uint64
    2442           0 :                                 var err error
    2443           0 :                                 if file.Virtual {
    2444           0 :                                         err = d.tableCache.withVirtualReader(
    2445           0 :                                                 file.VirtualMeta(),
    2446           0 :                                                 func(r sstable.VirtualReader) (err error) {
    2447           0 :                                                         size, err = r.EstimateDiskUsage(start, end)
    2448           0 :                                                         return err
    2449           0 :                                                 },
    2450             :                                         )
    2451           0 :                                 } else {
    2452           0 :                                         err = d.tableCache.withReader(
    2453           0 :                                                 file.PhysicalMeta(),
    2454           0 :                                                 func(r *sstable.Reader) (err error) {
    2455           0 :                                                         size, err = r.EstimateDiskUsage(start, end)
    2456           0 :                                                         return err
    2457           0 :                                                 },
    2458             :                                         )
    2459             :                                 }
    2460           0 :                                 if err != nil {
    2461           0 :                                         return 0, 0, 0, err
    2462           0 :                                 }
    2463           0 :                                 meta, err := d.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
    2464           0 :                                 if err != nil {
    2465           0 :                                         return 0, 0, 0, err
    2466           0 :                                 }
    2467           0 :                                 if meta.IsRemote() {
    2468           0 :                                         remoteSize += size
    2469           0 :                                         if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
    2470           0 :                                                 externalSize += size
    2471           0 :                                         }
    2472             :                                 }
    2473           0 :                                 totalSize += size
    2474             :                         }
    2475             :                 }
    2476             :         }
    2477           0 :         return totalSize, remoteSize, externalSize, nil
    2478             : }
    2479             : 
    2480           1 : func (d *DB) walPreallocateSize() int {
    2481           1 :         // Set the WAL preallocate size to 110% of the memtable size. Note that there
    2482           1 :         // is a bit of apples and oranges in units here as the memtabls size
    2483           1 :         // corresponds to the memory usage of the memtable while the WAL size is the
    2484           1 :         // size of the batches (plus overhead) stored in the WAL.
    2485           1 :         //
    2486           1 :         // TODO(peter): 110% of the memtable size is quite hefty for a block
    2487           1 :         // size. This logic is taken from GetWalPreallocateBlockSize in
    2488           1 :         // RocksDB. Could a smaller preallocation block size be used?
    2489           1 :         size := d.opts.MemTableSize
    2490           1 :         size = (size / 10) + size
    2491           1 :         return int(size)
    2492           1 : }
    2493             : 
    2494           1 : func (d *DB) newMemTable(logNum base.DiskFileNum, logSeqNum uint64) (*memTable, *flushableEntry) {
    2495           1 :         size := d.mu.mem.nextSize
    2496           1 :         if d.mu.mem.nextSize < d.opts.MemTableSize {
    2497           1 :                 d.mu.mem.nextSize *= 2
    2498           1 :                 if d.mu.mem.nextSize > d.opts.MemTableSize {
    2499           0 :                         d.mu.mem.nextSize = d.opts.MemTableSize
    2500           0 :                 }
    2501             :         }
    2502             : 
    2503           1 :         memtblOpts := memTableOptions{
    2504           1 :                 Options:   d.opts,
    2505           1 :                 logSeqNum: logSeqNum,
    2506           1 :         }
    2507           1 : 
    2508           1 :         // Before attempting to allocate a new memtable, check if there's one
    2509           1 :         // available for recycling in memTableRecycle. Large contiguous allocations
    2510           1 :         // can be costly as fragmentation makes it more difficult to find a large
    2511           1 :         // contiguous free space. We've observed 64MB allocations taking 10ms+.
    2512           1 :         //
    2513           1 :         // To reduce these costly allocations, up to 1 obsolete memtable is stashed
    2514           1 :         // in `d.memTableRecycle` to allow a future memtable rotation to reuse
    2515           1 :         // existing memory.
    2516           1 :         var mem *memTable
    2517           1 :         mem = d.memTableRecycle.Swap(nil)
    2518           1 :         if mem != nil && uint64(len(mem.arenaBuf)) != size {
    2519           1 :                 d.freeMemTable(mem)
    2520           1 :                 mem = nil
    2521           1 :         }
    2522           1 :         if mem != nil {
    2523           1 :                 // Carry through the existing buffer and memory reservation.
    2524           1 :                 memtblOpts.arenaBuf = mem.arenaBuf
    2525           1 :                 memtblOpts.releaseAccountingReservation = mem.releaseAccountingReservation
    2526           1 :         } else {
    2527           1 :                 mem = new(memTable)
    2528           1 :                 memtblOpts.arenaBuf = manual.New(int(size))
    2529           1 :                 memtblOpts.releaseAccountingReservation = d.opts.Cache.Reserve(int(size))
    2530           1 :                 d.memTableCount.Add(1)
    2531           1 :                 d.memTableReserved.Add(int64(size))
    2532           1 : 
    2533           1 :                 // Note: this is a no-op if invariants are disabled or race is enabled.
    2534           1 :                 invariants.SetFinalizer(mem, checkMemTable)
    2535           1 :         }
    2536           1 :         mem.init(memtblOpts)
    2537           1 : 
    2538           1 :         entry := d.newFlushableEntry(mem, logNum, logSeqNum)
    2539           1 :         entry.releaseMemAccounting = func() {
    2540           1 :                 // If the user leaks iterators, we may be releasing the memtable after
    2541           1 :                 // the DB is already closed. In this case, we want to just release the
    2542           1 :                 // memory because DB.Close won't come along to free it for us.
    2543           1 :                 if err := d.closed.Load(); err != nil {
    2544           1 :                         d.freeMemTable(mem)
    2545           1 :                         return
    2546           1 :                 }
    2547             : 
    2548             :                 // The next memtable allocation might be able to reuse this memtable.
    2549             :                 // Stash it on d.memTableRecycle.
    2550           1 :                 if unusedMem := d.memTableRecycle.Swap(mem); unusedMem != nil {
    2551           1 :                         // There was already a memtable waiting to be recycled. We're now
    2552           1 :                         // responsible for freeing it.
    2553           1 :                         d.freeMemTable(unusedMem)
    2554           1 :                 }
    2555             :         }
    2556           1 :         return mem, entry
    2557             : }
    2558             : 
    2559           1 : func (d *DB) freeMemTable(m *memTable) {
    2560           1 :         d.memTableCount.Add(-1)
    2561           1 :         d.memTableReserved.Add(-int64(len(m.arenaBuf)))
    2562           1 :         m.free()
    2563           1 : }
    2564             : 
    2565             : func (d *DB) newFlushableEntry(
    2566             :         f flushable, logNum base.DiskFileNum, logSeqNum uint64,
    2567           1 : ) *flushableEntry {
    2568           1 :         fe := &flushableEntry{
    2569           1 :                 flushable:      f,
    2570           1 :                 flushed:        make(chan struct{}),
    2571           1 :                 logNum:         logNum,
    2572           1 :                 logSeqNum:      logSeqNum,
    2573           1 :                 deleteFn:       d.mu.versions.addObsolete,
    2574           1 :                 deleteFnLocked: d.mu.versions.addObsoleteLocked,
    2575           1 :         }
    2576           1 :         fe.readerRefs.Store(1)
    2577           1 :         return fe
    2578           1 : }
    2579             : 
    2580             : // makeRoomForWrite ensures that the memtable has room to hold the contents of
    2581             : // Batch. It reserves the space in the memtable and adds a reference to the
    2582             : // memtable. The caller must later ensure that the memtable is unreferenced. If
    2583             : // the memtable is full, or a nil Batch is provided, the current memtable is
    2584             : // rotated (marked as immutable) and a new mutable memtable is allocated. This
    2585             : // memtable rotation also causes a log rotation.
    2586             : //
    2587             : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
    2588             : // may be released and reacquired.
    2589           1 : func (d *DB) makeRoomForWrite(b *Batch) error {
    2590           1 :         if b != nil && b.ingestedSSTBatch {
    2591           0 :                 panic("pebble: invalid function call")
    2592             :         }
    2593             : 
    2594           1 :         force := b == nil || b.flushable != nil
    2595           1 :         stalled := false
    2596           1 :         for {
    2597           1 :                 if b != nil && b.flushable == nil {
    2598           1 :                         err := d.mu.mem.mutable.prepare(b)
    2599           1 :                         if err != arenaskl.ErrArenaFull {
    2600           1 :                                 if stalled {
    2601           1 :                                         d.opts.EventListener.WriteStallEnd()
    2602           1 :                                 }
    2603           1 :                                 return err
    2604             :                         }
    2605           1 :                 } else if !force {
    2606           1 :                         if stalled {
    2607           1 :                                 d.opts.EventListener.WriteStallEnd()
    2608           1 :                         }
    2609           1 :                         return nil
    2610             :                 }
    2611             :                 // force || err == ErrArenaFull, so we need to rotate the current memtable.
    2612           1 :                 {
    2613           1 :                         var size uint64
    2614           1 :                         for i := range d.mu.mem.queue {
    2615           1 :                                 size += d.mu.mem.queue[i].totalBytes()
    2616           1 :                         }
    2617           1 :                         if size >= uint64(d.opts.MemTableStopWritesThreshold)*d.opts.MemTableSize {
    2618           1 :                                 // We have filled up the current memtable, but already queued memtables
    2619           1 :                                 // are still flushing, so we wait.
    2620           1 :                                 if !stalled {
    2621           1 :                                         stalled = true
    2622           1 :                                         d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
    2623           1 :                                                 Reason: "memtable count limit reached",
    2624           1 :                                         })
    2625           1 :                                 }
    2626           1 :                                 now := time.Now()
    2627           1 :                                 d.mu.compact.cond.Wait()
    2628           1 :                                 if b != nil {
    2629           1 :                                         b.commitStats.MemTableWriteStallDuration += time.Since(now)
    2630           1 :                                 }
    2631           1 :                                 continue
    2632             :                         }
    2633             :                 }
    2634           1 :                 l0ReadAmp := d.mu.versions.currentVersion().L0Sublevels.ReadAmplification()
    2635           1 :                 if l0ReadAmp >= d.opts.L0StopWritesThreshold {
    2636           1 :                         // There are too many level-0 files, so we wait.
    2637           1 :                         if !stalled {
    2638           1 :                                 stalled = true
    2639           1 :                                 d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
    2640           1 :                                         Reason: "L0 file count limit exceeded",
    2641           1 :                                 })
    2642           1 :                         }
    2643           1 :                         now := time.Now()
    2644           1 :                         d.mu.compact.cond.Wait()
    2645           1 :                         if b != nil {
    2646           1 :                                 b.commitStats.L0ReadAmpWriteStallDuration += time.Since(now)
    2647           1 :                         }
    2648           1 :                         continue
    2649             :                 }
    2650             : 
    2651           1 :                 var newLogNum base.DiskFileNum
    2652           1 :                 var prevLogSize uint64
    2653           1 :                 if !d.opts.DisableWAL {
    2654           1 :                         now := time.Now()
    2655           1 :                         newLogNum, prevLogSize = d.recycleWAL()
    2656           1 :                         if b != nil {
    2657           1 :                                 b.commitStats.WALRotationDuration += time.Since(now)
    2658           1 :                         }
    2659             :                 }
    2660             : 
    2661           1 :                 immMem := d.mu.mem.mutable
    2662           1 :                 imm := d.mu.mem.queue[len(d.mu.mem.queue)-1]
    2663           1 :                 imm.logSize = prevLogSize
    2664           1 :                 imm.flushForced = imm.flushForced || (b == nil)
    2665           1 : 
    2666           1 :                 // If we are manually flushing and we used less than half of the bytes in
    2667           1 :                 // the memtable, don't increase the size for the next memtable. This
    2668           1 :                 // reduces memtable memory pressure when an application is frequently
    2669           1 :                 // manually flushing.
    2670           1 :                 if (b == nil) && uint64(immMem.availBytes()) > immMem.totalBytes()/2 {
    2671           1 :                         d.mu.mem.nextSize = immMem.totalBytes()
    2672           1 :                 }
    2673             : 
    2674           1 :                 if b != nil && b.flushable != nil {
    2675           1 :                         // The batch is too large to fit in the memtable so add it directly to
    2676           1 :                         // the immutable queue. The flushable batch is associated with the same
    2677           1 :                         // log as the immutable memtable, but logically occurs after it in
    2678           1 :                         // seqnum space. We ensure while flushing that the flushable batch
    2679           1 :                         // is flushed along with the previous memtable in the flushable
    2680           1 :                         // queue. See the top level comment in DB.flush1 to learn how this
    2681           1 :                         // is ensured.
    2682           1 :                         //
    2683           1 :                         // See DB.commitWrite for the special handling of log writes for large
    2684           1 :                         // batches. In particular, the large batch has already written to
    2685           1 :                         // imm.logNum.
    2686           1 :                         entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
    2687           1 :                         // The large batch is by definition large. Reserve space from the cache
    2688           1 :                         // for it until it is flushed.
    2689           1 :                         entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
    2690           1 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    2691           1 :                 }
    2692             : 
    2693           1 :                 var logSeqNum uint64
    2694           1 :                 if b != nil {
    2695           1 :                         logSeqNum = b.SeqNum()
    2696           1 :                         if b.flushable != nil {
    2697           1 :                                 logSeqNum += uint64(b.Count())
    2698           1 :                         }
    2699           1 :                 } else {
    2700           1 :                         logSeqNum = d.mu.versions.logSeqNum.Load()
    2701           1 :                 }
    2702           1 :                 d.rotateMemtable(newLogNum, logSeqNum, immMem)
    2703           1 :                 force = false
    2704             :         }
    2705             : }
    2706             : 
    2707             : // Both DB.mu and commitPipeline.mu must be held by the caller.
    2708           1 : func (d *DB) rotateMemtable(newLogNum base.DiskFileNum, logSeqNum uint64, prev *memTable) {
    2709           1 :         // Create a new memtable, scheduling the previous one for flushing. We do
    2710           1 :         // this even if the previous memtable was empty because the DB.Flush
    2711           1 :         // mechanism is dependent on being able to wait for the empty memtable to
    2712           1 :         // flush. We can't just mark the empty memtable as flushed here because we
    2713           1 :         // also have to wait for all previous immutable tables to
    2714           1 :         // flush. Additionally, the memtable is tied to particular WAL file and we
    2715           1 :         // want to go through the flush path in order to recycle that WAL file.
    2716           1 :         //
    2717           1 :         // NB: newLogNum corresponds to the WAL that contains mutations that are
    2718           1 :         // present in the new memtable. When immutable memtables are flushed to
    2719           1 :         // disk, a VersionEdit will be created telling the manifest the minimum
    2720           1 :         // unflushed log number (which will be the next one in d.mu.mem.mutable
    2721           1 :         // that was not flushed).
    2722           1 :         //
    2723           1 :         // NB: prev should be the current mutable memtable.
    2724           1 :         var entry *flushableEntry
    2725           1 :         d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum)
    2726           1 :         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    2727           1 :         d.updateReadStateLocked(nil)
    2728           1 :         if prev.writerUnref() {
    2729           1 :                 d.maybeScheduleFlush()
    2730           1 :         }
    2731             : }
    2732             : 
    2733             : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
    2734             : // may be released and reacquired.
    2735           1 : func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
    2736           1 :         if d.opts.DisableWAL {
    2737           0 :                 panic("pebble: invalid function call")
    2738             :         }
    2739             : 
    2740           1 :         jobID := d.mu.nextJobID
    2741           1 :         d.mu.nextJobID++
    2742           1 :         newLogNum = d.mu.versions.getNextDiskFileNum()
    2743           1 : 
    2744           1 :         prevLogSize = uint64(d.mu.log.Size())
    2745           1 : 
    2746           1 :         // The previous log may have grown past its original physical
    2747           1 :         // size. Update its file size in the queue so we have a proper
    2748           1 :         // accounting of its file size.
    2749           1 :         if d.mu.log.queue[len(d.mu.log.queue)-1].fileSize < prevLogSize {
    2750           1 :                 d.mu.log.queue[len(d.mu.log.queue)-1].fileSize = prevLogSize
    2751           1 :         }
    2752           1 :         d.mu.Unlock()
    2753           1 : 
    2754           1 :         var err error
    2755           1 :         // Close the previous log first. This writes an EOF trailer
    2756           1 :         // signifying the end of the file and syncs it to disk. We must
    2757           1 :         // close the previous log before linking the new log file,
    2758           1 :         // otherwise a crash could leave both logs with unclean tails, and
    2759           1 :         // Open will treat the previous log as corrupt.
    2760           1 :         err = d.mu.log.LogWriter.Close()
    2761           1 :         metrics := d.mu.log.LogWriter.Metrics()
    2762           1 :         d.mu.Lock()
    2763           1 :         if err := d.mu.log.metrics.Merge(metrics); err != nil {
    2764           0 :                 d.opts.Logger.Errorf("metrics error: %s", err)
    2765           0 :         }
    2766           1 :         d.mu.Unlock()
    2767           1 : 
    2768           1 :         newLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, newLogNum)
    2769           1 : 
    2770           1 :         // Try to use a recycled log file. Recycling log files is an important
    2771           1 :         // performance optimization as it is faster to sync a file that has
    2772           1 :         // already been written, than one which is being written for the first
    2773           1 :         // time. This is due to the need to sync file metadata when a file is
    2774           1 :         // being written for the first time. Note this is true even if file
    2775           1 :         // preallocation is performed (e.g. fallocate).
    2776           1 :         var recycleLog fileInfo
    2777           1 :         var recycleOK bool
    2778           1 :         var newLogFile vfs.File
    2779           1 :         if err == nil {
    2780           1 :                 recycleLog, recycleOK = d.logRecycler.peek()
    2781           1 :                 if recycleOK {
    2782           0 :                         recycleLogName := base.MakeFilepath(d.opts.FS, d.walDirname, fileTypeLog, recycleLog.fileNum)
    2783           0 :                         newLogFile, err = d.opts.FS.ReuseForWrite(recycleLogName, newLogName)
    2784           0 :                         base.MustExist(d.opts.FS, newLogName, d.opts.Logger, err)
    2785           1 :                 } else {
    2786           1 :                         newLogFile, err = d.opts.FS.Create(newLogName)
    2787           1 :                         base.MustExist(d.opts.FS, newLogName, d.opts.Logger, err)
    2788           1 :                 }
    2789             :         }
    2790             : 
    2791           1 :         var newLogSize uint64
    2792           1 :         if err == nil && recycleOK {
    2793           0 :                 // Figure out the recycled WAL size. This Stat is necessary
    2794           0 :                 // because ReuseForWrite's contract allows for removing the
    2795           0 :                 // old file and creating a new one. We don't know whether the
    2796           0 :                 // WAL was actually recycled.
    2797           0 :                 // TODO(jackson): Adding a boolean to the ReuseForWrite return
    2798           0 :                 // value indicating whether or not the file was actually
    2799           0 :                 // reused would allow us to skip the stat and use
    2800           0 :                 // recycleLog.fileSize.
    2801           0 :                 var finfo os.FileInfo
    2802           0 :                 finfo, err = newLogFile.Stat()
    2803           0 :                 if err == nil {
    2804           0 :                         newLogSize = uint64(finfo.Size())
    2805           0 :                 }
    2806             :         }
    2807             : 
    2808           1 :         if err == nil {
    2809           1 :                 // TODO(peter): RocksDB delays sync of the parent directory until the
    2810           1 :                 // first time the log is synced. Is that worthwhile?
    2811           1 :                 err = d.walDir.Sync()
    2812           1 :         }
    2813             : 
    2814           1 :         if err != nil && newLogFile != nil {
    2815           0 :                 newLogFile.Close()
    2816           1 :         } else if err == nil {
    2817           1 :                 newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
    2818           1 :                         NoSyncOnClose:   d.opts.NoSyncOnClose,
    2819           1 :                         BytesPerSync:    d.opts.WALBytesPerSync,
    2820           1 :                         PreallocateSize: d.walPreallocateSize(),
    2821           1 :                 })
    2822           1 :         }
    2823             : 
    2824           1 :         if recycleOK {
    2825           0 :                 err = firstError(err, d.logRecycler.pop(recycleLog.fileNum.FileNum()))
    2826           0 :         }
    2827             : 
    2828           1 :         d.opts.EventListener.WALCreated(WALCreateInfo{
    2829           1 :                 JobID:           jobID,
    2830           1 :                 Path:            newLogName,
    2831           1 :                 FileNum:         newLogNum,
    2832           1 :                 RecycledFileNum: recycleLog.fileNum.FileNum(),
    2833           1 :                 Err:             err,
    2834           1 :         })
    2835           1 : 
    2836           1 :         d.mu.Lock()
    2837           1 : 
    2838           1 :         d.mu.versions.metrics.WAL.Files++
    2839           1 : 
    2840           1 :         if err != nil {
    2841           0 :                 // TODO(peter): avoid chewing through file numbers in a tight loop if there
    2842           0 :                 // is an error here.
    2843           0 :                 //
    2844           0 :                 // What to do here? Stumbling on doesn't seem worthwhile. If we failed to
    2845           0 :                 // close the previous log it is possible we lost a write.
    2846           0 :                 panic(err)
    2847             :         }
    2848             : 
    2849           1 :         d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: newLogSize})
    2850           1 :         d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
    2851           1 :                 WALFsyncLatency:    d.mu.log.metrics.fsyncLatency,
    2852           1 :                 WALMinSyncInterval: d.opts.WALMinSyncInterval,
    2853           1 :                 QueueSemChan:       d.commit.logSyncQSem,
    2854           1 :         })
    2855           1 :         if d.mu.log.registerLogWriterForTesting != nil {
    2856           0 :                 d.mu.log.registerLogWriterForTesting(d.mu.log.LogWriter)
    2857           0 :         }
    2858             : 
    2859           1 :         return
    2860             : }
    2861             : 
    2862           1 : func (d *DB) getEarliestUnflushedSeqNumLocked() uint64 {
    2863           1 :         seqNum := InternalKeySeqNumMax
    2864           1 :         for i := range d.mu.mem.queue {
    2865           1 :                 logSeqNum := d.mu.mem.queue[i].logSeqNum
    2866           1 :                 if seqNum > logSeqNum {
    2867           1 :                         seqNum = logSeqNum
    2868           1 :                 }
    2869             :         }
    2870           1 :         return seqNum
    2871             : }
    2872             : 
    2873           1 : func (d *DB) getInProgressCompactionInfoLocked(finishing *compaction) (rv []compactionInfo) {
    2874           1 :         for c := range d.mu.compact.inProgress {
    2875           1 :                 if len(c.flushing) == 0 && (finishing == nil || c != finishing) {
    2876           1 :                         info := compactionInfo{
    2877           1 :                                 versionEditApplied: c.versionEditApplied,
    2878           1 :                                 inputs:             c.inputs,
    2879           1 :                                 smallest:           c.smallest,
    2880           1 :                                 largest:            c.largest,
    2881           1 :                                 outputLevel:        -1,
    2882           1 :                         }
    2883           1 :                         if c.outputLevel != nil {
    2884           1 :                                 info.outputLevel = c.outputLevel.level
    2885           1 :                         }
    2886           1 :                         rv = append(rv, info)
    2887             :                 }
    2888             :         }
    2889           1 :         return
    2890             : }
    2891             : 
    2892           1 : func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compaction {
    2893           1 :         var compactions []manifest.L0Compaction
    2894           1 :         for _, info := range inProgress {
    2895           1 :                 // Skip in-progress compactions that have already committed; the L0
    2896           1 :                 // sublevels initialization code requires the set of in-progress
    2897           1 :                 // compactions to be consistent with the current version. Compactions
    2898           1 :                 // with versionEditApplied=true are already applied to the current
    2899           1 :                 // version and but are performing cleanup without the database mutex.
    2900           1 :                 if info.versionEditApplied {
    2901           1 :                         continue
    2902             :                 }
    2903           1 :                 l0 := false
    2904           1 :                 for _, cl := range info.inputs {
    2905           1 :                         l0 = l0 || cl.level == 0
    2906           1 :                 }
    2907           1 :                 if !l0 {
    2908           1 :                         continue
    2909             :                 }
    2910           1 :                 compactions = append(compactions, manifest.L0Compaction{
    2911           1 :                         Smallest:  info.smallest,
    2912           1 :                         Largest:   info.largest,
    2913           1 :                         IsIntraL0: info.outputLevel == 0,
    2914           1 :                 })
    2915             :         }
    2916           1 :         return compactions
    2917             : }
    2918             : 
    2919             : // firstError returns the first non-nil error of err0 and err1, or nil if both
    2920             : // are nil.
    2921           1 : func firstError(err0, err1 error) error {
    2922           1 :         if err0 != nil {
    2923           1 :                 return err0
    2924           1 :         }
    2925           1 :         return err1
    2926             : }
    2927             : 
    2928             : // SetCreatorID sets the CreatorID which is needed in order to use shared objects.
    2929             : // Remote object usage is disabled until this method is called the first time.
    2930             : // Once set, the Creator ID is persisted and cannot change.
    2931             : //
    2932             : // Does nothing if SharedStorage was not set in the options when the DB was
    2933             : // opened or if the DB is in read-only mode.
    2934           1 : func (d *DB) SetCreatorID(creatorID uint64) error {
    2935           1 :         if d.opts.Experimental.RemoteStorage == nil || d.opts.ReadOnly {
    2936           0 :                 return nil
    2937           0 :         }
    2938           1 :         return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
    2939             : }
    2940             : 
    2941             : // KeyStatistics keeps track of the number of keys that have been pinned by a
    2942             : // snapshot as well as counts of the different key kinds in the lsm.
    2943             : //
    2944             : // One way of using the accumulated stats, when we only have sets and dels,
    2945             : // and say the counts are represented as del_count, set_count,
    2946             : // del_latest_count, set_latest_count, snapshot_pinned_count.
    2947             : //
    2948             : //   - del_latest_count + set_latest_count is the set of unique user keys
    2949             : //     (unique).
    2950             : //
    2951             : //   - set_latest_count is the set of live unique user keys (live_unique).
    2952             : //
    2953             : //   - Garbage is del_count + set_count - live_unique.
    2954             : //
    2955             : //   - If everything were in the LSM, del_count+set_count-snapshot_pinned_count
    2956             : //     would also be the set of unique user keys (note that
    2957             : //     snapshot_pinned_count is counting something different -- see comment below).
    2958             : //     But snapshot_pinned_count only counts keys in the LSM so the excess here
    2959             : //     must be keys in memtables.
    2960             : type KeyStatistics struct {
    2961             :         // TODO(sumeer): the SnapshotPinned* are incorrect in that these older
    2962             :         // versions can be in a different level. Either fix the accounting or
    2963             :         // rename these fields.
    2964             : 
    2965             :         // SnapshotPinnedKeys represents obsolete keys that cannot be elided during
    2966             :         // a compaction, because they are required by an open snapshot.
    2967             :         SnapshotPinnedKeys int
    2968             :         // SnapshotPinnedKeysBytes is the total number of bytes of all snapshot
    2969             :         // pinned keys.
    2970             :         SnapshotPinnedKeysBytes uint64
    2971             :         // KindsCount is the count for each kind of key. It includes point keys,
    2972             :         // range deletes and range keys.
    2973             :         KindsCount [InternalKeyKindMax + 1]int
    2974             :         // LatestKindsCount is the count for each kind of key when it is the latest
    2975             :         // kind for a user key. It is only populated for point keys.
    2976             :         LatestKindsCount [InternalKeyKindMax + 1]int
    2977             : }
    2978             : 
    2979             : // LSMKeyStatistics is used by DB.ScanStatistics.
    2980             : type LSMKeyStatistics struct {
    2981             :         Accumulated KeyStatistics
    2982             :         // Levels contains statistics only for point keys. Range deletions and range keys will
    2983             :         // appear in Accumulated but not Levels.
    2984             :         Levels [numLevels]KeyStatistics
    2985             :         // BytesRead represents the logical, pre-compression size of keys and values read
    2986             :         BytesRead uint64
    2987             : }
    2988             : 
    2989             : // ScanStatisticsOptions is used by DB.ScanStatistics.
    2990             : type ScanStatisticsOptions struct {
    2991             :         // LimitBytesPerSecond indicates the number of bytes that are able to be read
    2992             :         // per second using ScanInternal.
    2993             :         // A value of 0 indicates that there is no limit set.
    2994             :         LimitBytesPerSecond int64
    2995             : }
    2996             : 
    2997             : // ScanStatistics returns the count of different key kinds within the lsm for a
    2998             : // key span [lower, upper) as well as the number of snapshot keys.
    2999             : func (d *DB) ScanStatistics(
    3000             :         ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
    3001           0 : ) (LSMKeyStatistics, error) {
    3002           0 :         stats := LSMKeyStatistics{}
    3003           0 :         var prevKey InternalKey
    3004           0 :         var rateLimitFunc func(key *InternalKey, val LazyValue) error
    3005           0 :         tb := tokenbucket.TokenBucket{}
    3006           0 : 
    3007           0 :         if opts.LimitBytesPerSecond != 0 {
    3008           0 :                 // Each "token" roughly corresponds to a byte that was read.
    3009           0 :                 tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
    3010           0 :                 rateLimitFunc = func(key *InternalKey, val LazyValue) error {
    3011           0 :                         return tb.WaitCtx(ctx, tokenbucket.Tokens(key.Size()+val.Len()))
    3012           0 :                 }
    3013             :         }
    3014             : 
    3015           0 :         scanInternalOpts := &scanInternalOptions{
    3016           0 :                 visitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
    3017           0 :                         // If the previous key is equal to the current point key, the current key was
    3018           0 :                         // pinned by a snapshot.
    3019           0 :                         size := uint64(key.Size())
    3020           0 :                         kind := key.Kind()
    3021           0 :                         sameKey := d.equal(prevKey.UserKey, key.UserKey)
    3022           0 :                         if iterInfo.Kind == IteratorLevelLSM && sameKey {
    3023           0 :                                 stats.Levels[iterInfo.Level].SnapshotPinnedKeys++
    3024           0 :                                 stats.Levels[iterInfo.Level].SnapshotPinnedKeysBytes += size
    3025           0 :                                 stats.Accumulated.SnapshotPinnedKeys++
    3026           0 :                                 stats.Accumulated.SnapshotPinnedKeysBytes += size
    3027           0 :                         }
    3028           0 :                         if iterInfo.Kind == IteratorLevelLSM {
    3029           0 :                                 stats.Levels[iterInfo.Level].KindsCount[kind]++
    3030           0 :                         }
    3031           0 :                         if !sameKey {
    3032           0 :                                 if iterInfo.Kind == IteratorLevelLSM {
    3033           0 :                                         stats.Levels[iterInfo.Level].LatestKindsCount[kind]++
    3034           0 :                                 }
    3035           0 :                                 stats.Accumulated.LatestKindsCount[kind]++
    3036             :                         }
    3037             : 
    3038           0 :                         stats.Accumulated.KindsCount[kind]++
    3039           0 :                         prevKey.CopyFrom(*key)
    3040           0 :                         stats.BytesRead += uint64(key.Size() + value.Len())
    3041           0 :                         return nil
    3042             :                 },
    3043           0 :                 visitRangeDel: func(start, end []byte, seqNum uint64) error {
    3044           0 :                         stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
    3045           0 :                         stats.BytesRead += uint64(len(start) + len(end))
    3046           0 :                         return nil
    3047           0 :                 },
    3048           0 :                 visitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
    3049           0 :                         stats.BytesRead += uint64(len(start) + len(end))
    3050           0 :                         for _, key := range keys {
    3051           0 :                                 stats.Accumulated.KindsCount[key.Kind()]++
    3052           0 :                                 stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
    3053           0 :                         }
    3054           0 :                         return nil
    3055             :                 },
    3056             :                 includeObsoleteKeys: true,
    3057             :                 IterOptions: IterOptions{
    3058             :                         KeyTypes:   IterKeyTypePointsAndRanges,
    3059             :                         LowerBound: lower,
    3060             :                         UpperBound: upper,
    3061             :                 },
    3062             :                 rateLimitFunc: rateLimitFunc,
    3063             :         }
    3064           0 :         iter, err := d.newInternalIter(ctx, snapshotIterOpts{}, scanInternalOpts)
    3065           0 :         if err != nil {
    3066           0 :                 return LSMKeyStatistics{}, err
    3067           0 :         }
    3068           0 :         defer iter.close()
    3069           0 : 
    3070           0 :         err = scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
    3071           0 : 
    3072           0 :         if err != nil {
    3073           0 :                 return LSMKeyStatistics{}, err
    3074           0 :         }
    3075             : 
    3076           0 :         return stats, nil
    3077             : }
    3078             : 
    3079             : // ObjProvider returns the objstorage.Provider for this database. Meant to be
    3080             : // used for internal purposes only.
    3081           1 : func (d *DB) ObjProvider() objstorage.Provider {
    3082           1 :         return d.objProvider
    3083           1 : }
    3084             : 
    3085           0 : func (d *DB) checkVirtualBounds(m *fileMetadata) {
    3086           0 :         if !invariants.Enabled {
    3087           0 :                 return
    3088           0 :         }
    3089             : 
    3090           0 :         objMeta, err := d.objProvider.Lookup(fileTypeTable, m.FileBacking.DiskFileNum)
    3091           0 :         if err != nil {
    3092           0 :                 panic(err)
    3093             :         }
    3094           0 :         if objMeta.IsExternal() {
    3095           0 :                 // Nothing to do; bounds are expected to be loose.
    3096           0 :                 return
    3097           0 :         }
    3098             : 
    3099           0 :         if m.HasPointKeys {
    3100           0 :                 pointIter, rangeDelIter, err := d.newIters(context.TODO(), m, nil, internalIterOpts{})
    3101           0 :                 if err != nil {
    3102           0 :                         panic(errors.Wrap(err, "pebble: error creating point iterator"))
    3103             :                 }
    3104             : 
    3105           0 :                 defer pointIter.Close()
    3106           0 :                 if rangeDelIter != nil {
    3107           0 :                         defer rangeDelIter.Close()
    3108           0 :                 }
    3109             : 
    3110           0 :                 pointKey, _ := pointIter.First()
    3111           0 :                 var rangeDel *keyspan.Span
    3112           0 :                 if rangeDelIter != nil {
    3113           0 :                         rangeDel = rangeDelIter.First()
    3114           0 :                 }
    3115             : 
    3116             :                 // Check that the lower bound is tight.
    3117           0 :                 if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.SmallestPointKey.UserKey) != 0) &&
    3118           0 :                         (pointKey == nil || d.cmp(pointKey.UserKey, m.SmallestPointKey.UserKey) != 0) {
    3119           0 :                         panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.FileNum))
    3120             :                 }
    3121             : 
    3122           0 :                 pointKey, _ = pointIter.Last()
    3123           0 :                 rangeDel = nil
    3124           0 :                 if rangeDelIter != nil {
    3125           0 :                         rangeDel = rangeDelIter.Last()
    3126           0 :                 }
    3127             : 
    3128             :                 // Check that the upper bound is tight.
    3129           0 :                 if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.LargestPointKey.UserKey) != 0) &&
    3130           0 :                         (pointKey == nil || d.cmp(pointKey.UserKey, m.LargestPointKey.UserKey) != 0) {
    3131           0 :                         panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.FileNum))
    3132             :                 }
    3133             : 
    3134             :                 // Check that iterator keys are within bounds.
    3135           0 :                 for key, _ := pointIter.First(); key != nil; key, _ = pointIter.Next() {
    3136           0 :                         if d.cmp(key.UserKey, m.SmallestPointKey.UserKey) < 0 || d.cmp(key.UserKey, m.LargestPointKey.UserKey) > 0 {
    3137           0 :                                 panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.UserKey))
    3138             :                         }
    3139             :                 }
    3140             : 
    3141           0 :                 if rangeDelIter != nil {
    3142           0 :                         for key := rangeDelIter.First(); key != nil; key = rangeDelIter.Next() {
    3143           0 :                                 if d.cmp(key.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
    3144           0 :                                         panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
    3145             :                                 }
    3146             : 
    3147           0 :                                 if d.cmp(key.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
    3148           0 :                                         panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
    3149             :                                 }
    3150             :                         }
    3151             :                 }
    3152             :         }
    3153             : 
    3154           0 :         if !m.HasRangeKeys {
    3155           0 :                 return
    3156           0 :         }
    3157             : 
    3158           0 :         rangeKeyIter, err := d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
    3159           0 :         defer rangeKeyIter.Close()
    3160           0 : 
    3161           0 :         if err != nil {
    3162           0 :                 panic(errors.Wrap(err, "pebble: error creating range key iterator"))
    3163             :         }
    3164             : 
    3165             :         // Check that the lower bound is tight.
    3166           0 :         if d.cmp(rangeKeyIter.First().SmallestKey().UserKey, m.SmallestRangeKey.UserKey) != 0 {
    3167           0 :                 panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.FileNum))
    3168             :         }
    3169             : 
    3170             :         // Check that upper bound is tight.
    3171           0 :         if d.cmp(rangeKeyIter.Last().LargestKey().UserKey, m.LargestRangeKey.UserKey) != 0 {
    3172           0 :                 panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.FileNum))
    3173             :         }
    3174             : 
    3175           0 :         for key := rangeKeyIter.First(); key != nil; key = rangeKeyIter.Next() {
    3176           0 :                 if d.cmp(key.SmallestKey().UserKey, m.SmallestRangeKey.UserKey) < 0 {
    3177           0 :                         panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.SmallestKey().UserKey))
    3178             :                 }
    3179           0 :                 if d.cmp(key.LargestKey().UserKey, m.LargestRangeKey.UserKey) > 0 {
    3180           0 :                         panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.LargestKey().UserKey))
    3181             :                 }
    3182             :         }
    3183             : }

Generated by: LCOV version 1.14