LCOV - code coverage report
Current view: top level - pebble - db.go (source / functions) Coverage Total Hit
Test: 2025-07-05 08:18Z 6f57a213 - meta test only.lcov Lines: 74.0 % 1746 1292
Test Date: 2025-07-05 08:20:30 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : // Package pebble provides an ordered key/value store.
       6              : package pebble // import "github.com/cockroachdb/pebble"
       7              : 
       8              : import (
       9              :         "context"
      10              :         "fmt"
      11              :         "io"
      12              :         "slices"
      13              :         "sync"
      14              :         "sync/atomic"
      15              :         "time"
      16              :         "unsafe"
      17              : 
      18              :         "github.com/cockroachdb/crlib/crtime"
      19              :         "github.com/cockroachdb/errors"
      20              :         "github.com/cockroachdb/pebble/internal/arenaskl"
      21              :         "github.com/cockroachdb/pebble/internal/base"
      22              :         "github.com/cockroachdb/pebble/internal/cache"
      23              :         "github.com/cockroachdb/pebble/internal/invalidating"
      24              :         "github.com/cockroachdb/pebble/internal/invariants"
      25              :         "github.com/cockroachdb/pebble/internal/keyspan"
      26              :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      27              :         "github.com/cockroachdb/pebble/internal/manifest"
      28              :         "github.com/cockroachdb/pebble/internal/manual"
      29              :         "github.com/cockroachdb/pebble/internal/problemspans"
      30              :         "github.com/cockroachdb/pebble/objstorage"
      31              :         "github.com/cockroachdb/pebble/objstorage/remote"
      32              :         "github.com/cockroachdb/pebble/rangekey"
      33              :         "github.com/cockroachdb/pebble/record"
      34              :         "github.com/cockroachdb/pebble/sstable"
      35              :         "github.com/cockroachdb/pebble/sstable/block"
      36              :         "github.com/cockroachdb/pebble/vfs"
      37              :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      38              :         "github.com/cockroachdb/pebble/wal"
      39              :         "github.com/cockroachdb/tokenbucket"
      40              :         "github.com/prometheus/client_golang/prometheus"
      41              : )
      42              : 
      43              : const (
      44              :         // minFileCacheSize is the minimum size of the file cache, for a single db.
      45              :         minFileCacheSize = 64
      46              : 
      47              :         // numNonFileCacheFiles is an approximation for the number of files
      48              :         // that we don't account for in the file cache, for a given db.
      49              :         numNonFileCacheFiles = 10
      50              : )
      51              : 
      52              : var (
      53              :         // ErrNotFound is returned when a get operation does not find the requested
      54              :         // key.
      55              :         ErrNotFound = base.ErrNotFound
      56              :         // ErrClosed is panicked when an operation is performed on a closed snapshot or
      57              :         // DB. Use errors.Is(err, ErrClosed) to check for this error.
      58              :         ErrClosed = errors.New("pebble: closed")
      59              :         // ErrReadOnly is returned when a write operation is performed on a read-only
      60              :         // database.
      61              :         ErrReadOnly = errors.New("pebble: read-only")
      62              :         // errNoSplit indicates that the user is trying to perform a range key
      63              :         // operation but the configured Comparer does not provide a Split
      64              :         // implementation.
      65              :         errNoSplit = errors.New("pebble: Comparer.Split required for range key operations")
      66              : )
      67              : 
      68              : // Reader is a readable key/value store.
      69              : //
      70              : // It is safe to call Get and NewIter from concurrent goroutines.
      71              : type Reader interface {
      72              :         // Get gets the value for the given key. It returns ErrNotFound if the DB
      73              :         // does not contain the key.
      74              :         //
      75              :         // The caller should not modify the contents of the returned slice, but it is
      76              :         // safe to modify the contents of the argument after Get returns. The
      77              :         // returned slice will remain valid until the returned Closer is closed. On
      78              :         // success, the caller MUST call closer.Close() or a memory leak will occur.
      79              :         Get(key []byte) (value []byte, closer io.Closer, err error)
      80              : 
      81              :         // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
      82              :         // return false). The iterator can be positioned via a call to SeekGE,
      83              :         // SeekLT, First or Last.
      84              :         NewIter(o *IterOptions) (*Iterator, error)
      85              : 
      86              :         // NewIterWithContext is like NewIter, and additionally accepts a context
      87              :         // for tracing.
      88              :         NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error)
      89              : 
      90              :         // Close closes the Reader. It may or may not close any underlying io.Reader
      91              :         // or io.Writer, depending on how the DB was created.
      92              :         //
      93              :         // It is not safe to close a DB until all outstanding iterators are closed.
      94              :         // It is valid to call Close multiple times. Other methods should not be
      95              :         // called after the DB has been closed.
      96              :         Close() error
      97              : }
      98              : 
      99              : // Writer is a writable key/value store.
     100              : //
     101              : // Goroutine safety is dependent on the specific implementation.
     102              : type Writer interface {
     103              :         // Apply the operations contained in the batch to the DB.
     104              :         //
     105              :         // It is safe to modify the contents of the arguments after Apply returns.
     106              :         Apply(batch *Batch, o *WriteOptions) error
     107              : 
     108              :         // Delete deletes the value for the given key. Deletes are blind all will
     109              :         // succeed even if the given key does not exist.
     110              :         //
     111              :         // It is safe to modify the contents of the arguments after Delete returns.
     112              :         Delete(key []byte, o *WriteOptions) error
     113              : 
     114              :         // DeleteSized behaves identically to Delete, but takes an additional
     115              :         // argument indicating the size of the value being deleted. DeleteSized
     116              :         // should be preferred when the caller has the expectation that there exists
     117              :         // a single internal KV pair for the key (eg, the key has not been
     118              :         // overwritten recently), and the caller knows the size of its value.
     119              :         //
     120              :         // DeleteSized will record the value size within the tombstone and use it to
     121              :         // inform compaction-picking heuristics which strive to reduce space
     122              :         // amplification in the LSM. This "calling your shot" mechanic allows the
     123              :         // storage engine to more accurately estimate and reduce space
     124              :         // amplification.
     125              :         //
     126              :         // It is safe to modify the contents of the arguments after DeleteSized
     127              :         // returns.
     128              :         DeleteSized(key []byte, valueSize uint32, _ *WriteOptions) error
     129              : 
     130              :         // SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
     131              :         // it is a blind operation that will succeed even if the given key does not exist.
     132              :         //
     133              :         // WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
     134              :         // then deleted using SingleDelete. The record may appear deleted immediately, but be
     135              :         // resurrected at a later time after compactions have been performed. Or the record may
     136              :         // be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
     137              :         // previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
     138              :         // only delete the most recently written version for a key. These different semantics allow
     139              :         // the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
     140              :         // corresponding Set operation is encountered. These semantics require extreme care to handle
     141              :         // properly. Only use if you have a workload where the performance gain is critical and you
     142              :         // can guarantee that a record is written once and then deleted once.
     143              :         //
     144              :         // Note that SINGLEDEL, SET, SINGLEDEL, SET, DEL/RANGEDEL, ... from most
     145              :         // recent to older will work as intended since there is a single SET
     146              :         // sandwiched between SINGLEDEL/DEL/RANGEDEL.
     147              :         //
     148              :         // IMPLEMENTATION WARNING: By offering SingleDelete, Pebble must guarantee
     149              :         // that there is no duplication of writes inside Pebble. That is, idempotent
     150              :         // application of writes is insufficient. For example, if a SET operation
     151              :         // gets duplicated inside Pebble, resulting in say SET#20 and SET#17, the
     152              :         // caller may issue a SINGLEDEL#25 and it will not have the desired effect.
     153              :         // A duplication where a SET#20 is duplicated across two sstables will have
     154              :         // the same correctness problem, since the SINGLEDEL may meet one of the
     155              :         // SETs. This guarantee is partially achieved by ensuring that a WAL and a
     156              :         // flushable are usually in one-to-one correspondence, and atomically
     157              :         // updating the MANIFEST when the flushable is flushed (which ensures the
     158              :         // WAL will never be replayed). There is one exception: a flushableBatch (a
     159              :         // batch too large to fit in a memtable) is written to the end of the WAL
     160              :         // that it shares with the preceding memtable. This is safe because the
     161              :         // memtable and the flushableBatch are part of the same flush (see DB.flush1
     162              :         // where this invariant is maintained). If the memtable were to be flushed
     163              :         // without the flushableBatch, the WAL cannot yet be deleted and if a crash
     164              :         // happened, the WAL would be replayed despite the memtable already being
     165              :         // flushed.
     166              :         //
     167              :         // It is safe to modify the contents of the arguments after SingleDelete returns.
     168              :         SingleDelete(key []byte, o *WriteOptions) error
     169              : 
     170              :         // DeleteRange deletes all of the point keys (and values) in the range
     171              :         // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
     172              :         // delete overlapping range keys (eg, keys set via RangeKeySet).
     173              :         //
     174              :         // It is safe to modify the contents of the arguments after DeleteRange
     175              :         // returns.
     176              :         DeleteRange(start, end []byte, o *WriteOptions) error
     177              : 
     178              :         // LogData adds the specified to the batch. The data will be written to the
     179              :         // WAL, but not added to memtables or sstables. Log data is never indexed,
     180              :         // which makes it useful for testing WAL performance.
     181              :         //
     182              :         // It is safe to modify the contents of the argument after LogData returns.
     183              :         LogData(data []byte, opts *WriteOptions) error
     184              : 
     185              :         // Merge merges the value for the given key. The details of the merge are
     186              :         // dependent upon the configured merge operation.
     187              :         //
     188              :         // It is safe to modify the contents of the arguments after Merge returns.
     189              :         Merge(key, value []byte, o *WriteOptions) error
     190              : 
     191              :         // Set sets the value for the given key. It overwrites any previous value
     192              :         // for that key; a DB is not a multi-map.
     193              :         //
     194              :         // It is safe to modify the contents of the arguments after Set returns.
     195              :         Set(key, value []byte, o *WriteOptions) error
     196              : 
     197              :         // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
     198              :         // timestamp suffix to value. The suffix is optional. If any portion of the key
     199              :         // range [start, end) is already set by a range key with the same suffix value,
     200              :         // RangeKeySet overrides it.
     201              :         //
     202              :         // It is safe to modify the contents of the arguments after RangeKeySet returns.
     203              :         RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error
     204              : 
     205              :         // RangeKeyUnset removes a range key mapping the key range [start, end) at the
     206              :         // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
     207              :         // range key. RangeKeyUnset only removes portions of range keys that fall within
     208              :         // the [start, end) key span, and only range keys with suffixes that exactly
     209              :         // match the unset suffix.
     210              :         //
     211              :         // It is safe to modify the contents of the arguments after RangeKeyUnset
     212              :         // returns.
     213              :         RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error
     214              : 
     215              :         // RangeKeyDelete deletes all of the range keys in the range [start,end)
     216              :         // (inclusive on start, exclusive on end). It does not delete point keys (for
     217              :         // that use DeleteRange). RangeKeyDelete removes all range keys within the
     218              :         // bounds, including those with or without suffixes.
     219              :         //
     220              :         // It is safe to modify the contents of the arguments after RangeKeyDelete
     221              :         // returns.
     222              :         RangeKeyDelete(start, end []byte, opts *WriteOptions) error
     223              : }
     224              : 
     225              : // DB provides a concurrent, persistent ordered key/value store.
     226              : //
     227              : // A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get
     228              : // and Delete will return ErrNotFound if the requested key is not in the store.
     229              : // Callers are free to ignore this error.
     230              : //
     231              : // A DB also allows for iterating over the key/value pairs in key order. If d
     232              : // is a DB, the code below prints all key/value pairs whose keys are 'greater
     233              : // than or equal to' k:
     234              : //
     235              : //      iter := d.NewIter(readOptions)
     236              : //      for iter.SeekGE(k); iter.Valid(); iter.Next() {
     237              : //              fmt.Printf("key=%q value=%q\n", iter.Key(), iter.Value())
     238              : //      }
     239              : //      return iter.Close()
     240              : //
     241              : // The Options struct holds the optional parameters for the DB, including a
     242              : // Comparer to define a 'less than' relationship over keys. It is always valid
     243              : // to pass a nil *Options, which means to use the default parameter values. Any
     244              : // zero field of a non-nil *Options also means to use the default value for
     245              : // that parameter. Thus, the code below uses a custom Comparer, but the default
     246              : // values for every other parameter:
     247              : //
     248              : //      db := pebble.Open(&Options{
     249              : //              Comparer: myComparer,
     250              : //      })
     251              : type DB struct {
     252              :         // The count and size of referenced memtables. This includes memtables
     253              :         // present in DB.mu.mem.queue, as well as memtables that have been flushed
     254              :         // but are still referenced by an inuse readState, as well as up to one
     255              :         // memTable waiting to be reused and stored in d.memTableRecycle.
     256              :         memTableCount    atomic.Int64
     257              :         memTableReserved atomic.Int64 // number of bytes reserved in the cache for memtables
     258              :         // memTableRecycle holds a pointer to an obsolete memtable. The next
     259              :         // memtable allocation will reuse this memtable if it has not already been
     260              :         // recycled.
     261              :         memTableRecycle atomic.Pointer[memTable]
     262              : 
     263              :         // The logical size of the current WAL.
     264              :         logSize atomic.Uint64
     265              :         // The number of input bytes to the log. This is the raw size of the
     266              :         // batches written to the WAL, without the overhead of the record
     267              :         // envelopes.
     268              :         logBytesIn atomic.Uint64
     269              : 
     270              :         // The number of bytes available on disk.
     271              :         diskAvailBytes       atomic.Uint64
     272              :         lowDiskSpaceReporter lowDiskSpaceReporter
     273              : 
     274              :         cacheHandle    *cache.Handle
     275              :         dirname        string
     276              :         opts           *Options
     277              :         cmp            Compare
     278              :         equal          Equal
     279              :         merge          Merge
     280              :         split          Split
     281              :         abbreviatedKey AbbreviatedKey
     282              :         // The threshold for determining when a batch is "large" and will skip being
     283              :         // inserted into a memtable.
     284              :         largeBatchThreshold uint64
     285              :         // The current OPTIONS file number.
     286              :         optionsFileNum base.DiskFileNum
     287              :         // The on-disk size of the current OPTIONS file.
     288              :         optionsFileSize uint64
     289              : 
     290              :         // objProvider is used to access and manage SSTs.
     291              :         objProvider objstorage.Provider
     292              : 
     293              :         fileLock *Lock
     294              :         dataDir  vfs.File
     295              : 
     296              :         fileCache            *fileCacheHandle
     297              :         newIters             tableNewIters
     298              :         tableNewRangeKeyIter keyspanimpl.TableNewSpanIter
     299              : 
     300              :         commit *commitPipeline
     301              : 
     302              :         // readState provides access to the state needed for reading without needing
     303              :         // to acquire DB.mu.
     304              :         readState struct {
     305              :                 sync.RWMutex
     306              :                 val *readState
     307              :         }
     308              : 
     309              :         closed   *atomic.Value
     310              :         closedCh chan struct{}
     311              : 
     312              :         cleanupManager *cleanupManager
     313              : 
     314              :         // During an iterator close, we may asynchronously schedule read compactions.
     315              :         // We want to wait for those goroutines to finish, before closing the DB.
     316              :         // compactionShedulers.Wait() should not be called while the DB.mu is held.
     317              :         compactionSchedulers sync.WaitGroup
     318              : 
     319              :         // The main mutex protecting internal DB state. This mutex encompasses many
     320              :         // fields because those fields need to be accessed and updated atomically. In
     321              :         // particular, the current version, log.*, mem.*, and snapshot list need to
     322              :         // be accessed and updated atomically during compaction.
     323              :         //
     324              :         // Care is taken to avoid holding DB.mu during IO operations. Accomplishing
     325              :         // this sometimes requires releasing DB.mu in a method that was called with
     326              :         // it held. See versionSet.UpdateVersionLocked() and DB.makeRoomForWrite() for
     327              :         // examples. This is a common pattern, so be careful about expectations that
     328              :         // DB.mu will be held continuously across a set of calls.
     329              :         mu struct {
     330              :                 sync.Mutex
     331              : 
     332              :                 formatVers struct {
     333              :                         // vers is the database's current format major version.
     334              :                         // Backwards-incompatible features are gated behind new
     335              :                         // format major versions and not enabled until a database's
     336              :                         // version is ratcheted upwards.
     337              :                         //
     338              :                         // Although this is under the `mu` prefix, readers may read vers
     339              :                         // atomically without holding d.mu. Writers must only write to this
     340              :                         // value through finalizeFormatVersUpgrade which requires d.mu is
     341              :                         // held.
     342              :                         vers atomic.Uint64
     343              :                         // marker is the atomic marker for the format major version.
     344              :                         // When a database's version is ratcheted upwards, the
     345              :                         // marker is moved in order to atomically record the new
     346              :                         // version.
     347              :                         marker *atomicfs.Marker
     348              :                         // ratcheting when set to true indicates that the database is
     349              :                         // currently in the process of ratcheting the format major version
     350              :                         // to vers + 1. As a part of ratcheting the format major version,
     351              :                         // migrations may drop and re-acquire the mutex.
     352              :                         ratcheting bool
     353              :                 }
     354              : 
     355              :                 // The ID of the next job. Job IDs are passed to event listener
     356              :                 // notifications and act as a mechanism for tying together the events and
     357              :                 // log messages for a single job such as a flush, compaction, or file
     358              :                 // ingestion. Job IDs are not serialized to disk or used for correctness.
     359              :                 nextJobID JobID
     360              : 
     361              :                 // The collection of immutable versions and state about the log and visible
     362              :                 // sequence numbers. Use the pointer here to ensure the atomic fields in
     363              :                 // version set are aligned properly.
     364              :                 versions *versionSet
     365              : 
     366              :                 log struct {
     367              :                         // manager is not protected by mu, but calls to Create must be
     368              :                         // serialized, and happen after the previous writer is closed.
     369              :                         manager wal.Manager
     370              :                         // The Writer is protected by commitPipeline.mu. This allows log writes
     371              :                         // to be performed without holding DB.mu, but requires both
     372              :                         // commitPipeline.mu and DB.mu to be held when rotating the WAL/memtable
     373              :                         // (i.e. makeRoomForWrite). Can be nil.
     374              :                         writer  wal.Writer
     375              :                         metrics struct {
     376              :                                 // fsyncLatency has its own internal synchronization, and is not
     377              :                                 // protected by mu.
     378              :                                 fsyncLatency prometheus.Histogram
     379              :                                 // Updated whenever a wal.Writer is closed.
     380              :                                 record.LogWriterMetrics
     381              :                         }
     382              :                 }
     383              : 
     384              :                 mem struct {
     385              :                         // The current mutable memTable. Readers of the pointer may hold
     386              :                         // either DB.mu or commitPipeline.mu.
     387              :                         //
     388              :                         // Its internal fields are protected by commitPipeline.mu. This
     389              :                         // allows batch commits to be performed without DB.mu as long as no
     390              :                         // memtable rotation is required.
     391              :                         //
     392              :                         // Both commitPipeline.mu and DB.mu must be held when rotating the
     393              :                         // memtable.
     394              :                         mutable *memTable
     395              :                         // Queue of flushables (the mutable memtable is at end). Elements are
     396              :                         // added to the end of the slice and removed from the beginning. Once an
     397              :                         // index is set it is never modified making a fixed slice immutable and
     398              :                         // safe for concurrent reads.
     399              :                         queue flushableList
     400              :                         // nextSize is the size of the next memtable. The memtable size starts at
     401              :                         // min(256KB,Options.MemTableSize) and doubles each time a new memtable
     402              :                         // is allocated up to Options.MemTableSize. This reduces the memory
     403              :                         // footprint of memtables when lots of DB instances are used concurrently
     404              :                         // in test environments.
     405              :                         nextSize uint64
     406              :                 }
     407              : 
     408              :                 compact struct {
     409              :                         // Condition variable used to signal when a flush or compaction has
     410              :                         // completed. Used by the write-stall mechanism to wait for the stall
     411              :                         // condition to clear. See DB.makeRoomForWrite().
     412              :                         cond sync.Cond
     413              :                         // True when a flush is in progress.
     414              :                         flushing bool
     415              :                         // The number of ongoing non-download compactions.
     416              :                         compactingCount int
     417              :                         // The number of download compactions.
     418              :                         downloadingCount int
     419              :                         // The list of deletion hints, suggesting ranges for delete-only
     420              :                         // compactions.
     421              :                         deletionHints []deleteCompactionHint
     422              :                         // The list of manual compactions. The next manual compaction to perform
     423              :                         // is at the start of the list. New entries are added to the end.
     424              :                         manual    []*manualCompaction
     425              :                         manualLen atomic.Int32
     426              :                         // manualID is used to identify manualCompactions in the manual slice.
     427              :                         manualID uint64
     428              :                         // downloads is the list of pending download tasks. The next download to
     429              :                         // perform is at the start of the list. New entries are added to the end.
     430              :                         downloads []*downloadSpanTask
     431              :                         // inProgress is the set of in-progress flushes and compactions.
     432              :                         // It's used in the calculation of some metrics and to initialize L0
     433              :                         // sublevels' state. Some of the compactions contained within this
     434              :                         // map may have already committed an edit to the version but are
     435              :                         // lingering performing cleanup, like deleting obsolete files.
     436              :                         inProgress map[compaction]struct{}
     437              : 
     438              :                         // rescheduleReadCompaction indicates to an iterator that a read compaction
     439              :                         // should be scheduled.
     440              :                         rescheduleReadCompaction bool
     441              : 
     442              :                         // readCompactions is a readCompactionQueue which keeps track of the
     443              :                         // compactions which we might have to perform.
     444              :                         readCompactions readCompactionQueue
     445              : 
     446              :                         // The cumulative duration of all completed compactions since Open.
     447              :                         // Does not include flushes.
     448              :                         duration time.Duration
     449              :                         // Flush throughput metric.
     450              :                         flushWriteThroughput ThroughputMetric
     451              :                         // The idle start time for the flush "loop", i.e., when the flushing
     452              :                         // bool above transitions to false.
     453              :                         noOngoingFlushStartTime crtime.Mono
     454              :                 }
     455              : 
     456              :                 fileDeletions struct {
     457              :                         // Non-zero when file cleaning is disableCount. The disableCount
     458              :                         // count acts as a reference count to prohibit file cleaning. See
     459              :                         // DB.{disable,enable}FileDeletions().
     460              :                         disableCount int
     461              :                         // queuedStats holds cumulative stats for files that have been
     462              :                         // queued for deletion by the cleanup manager. These stats are
     463              :                         // monotonically increasing for the *DB's lifetime.
     464              :                         queuedStats obsoleteObjectStats
     465              :                 }
     466              : 
     467              :                 snapshots struct {
     468              :                         // The list of active snapshots.
     469              :                         snapshotList
     470              : 
     471              :                         // The cumulative count and size of snapshot-pinned keys written to
     472              :                         // sstables.
     473              :                         cumulativePinnedCount uint64
     474              :                         cumulativePinnedSize  uint64
     475              :                 }
     476              : 
     477              :                 tableStats struct {
     478              :                         // Condition variable used to signal the completion of a
     479              :                         // job to collect table stats.
     480              :                         cond sync.Cond
     481              :                         // True when a stat collection operation is in progress.
     482              :                         loading bool
     483              :                         // True if stat collection has loaded statistics for all tables
     484              :                         // other than those listed explicitly in pending. This flag starts
     485              :                         // as false when a database is opened and flips to true once stat
     486              :                         // collection has caught up.
     487              :                         loadedInitial bool
     488              :                         // A slice of files for which stats have not been computed.
     489              :                         // Compactions, ingests, flushes append files to be processed. An
     490              :                         // active stat collection goroutine clears the list and processes
     491              :                         // them.
     492              :                         pending []manifest.NewTableEntry
     493              :                 }
     494              : 
     495              :                 tableValidation struct {
     496              :                         // cond is a condition variable used to signal the completion of a
     497              :                         // job to validate one or more sstables.
     498              :                         cond sync.Cond
     499              :                         // pending is a slice of metadata for sstables waiting to be
     500              :                         // validated. Only physical sstables should be added to the pending
     501              :                         // queue.
     502              :                         pending []manifest.NewTableEntry
     503              :                         // validating is set to true when validation is running.
     504              :                         validating bool
     505              :                 }
     506              : 
     507              :                 // annotators contains various instances of manifest.Annotator which
     508              :                 // should be protected from concurrent access.
     509              :                 annotators struct {
     510              :                         // totalFileSize is the sum of the size of all files in the
     511              :                         // database. This includes local, remote, and external sstables --
     512              :                         // along with blob files.
     513              :                         totalFileSize *manifest.Annotator[uint64]
     514              :                         remoteSize    *manifest.Annotator[uint64]
     515              :                         externalSize  *manifest.Annotator[uint64]
     516              :                 }
     517              :         }
     518              : 
     519              :         // problemSpans keeps track of spans of keys within LSM levels where
     520              :         // compactions have failed; used to avoid retrying these compactions too
     521              :         // quickly.
     522              :         problemSpans problemspans.ByLevel
     523              : 
     524              :         // Normally equal to time.Now() but may be overridden in tests.
     525              :         timeNow func() time.Time
     526              :         // the time at database Open; may be used to compute metrics like effective
     527              :         // compaction concurrency
     528              :         openedAt time.Time
     529              : }
     530              : 
     531              : var _ Reader = (*DB)(nil)
     532              : var _ Writer = (*DB)(nil)
     533              : 
     534              : // TestOnlyWaitForCleaning MUST only be used in tests.
     535            0 : func (d *DB) TestOnlyWaitForCleaning() {
     536            0 :         d.cleanupManager.Wait()
     537            0 : }
     538              : 
     539              : // Get gets the value for the given key. It returns ErrNotFound if the DB does
     540              : // not contain the key.
     541              : //
     542              : // The caller should not modify the contents of the returned slice, but it is
     543              : // safe to modify the contents of the argument after Get returns. The returned
     544              : // slice will remain valid until the returned Closer is closed. On success, the
     545              : // caller MUST call closer.Close() or a memory leak will occur.
     546            1 : func (d *DB) Get(key []byte) ([]byte, io.Closer, error) {
     547            1 :         return d.getInternal(key, nil /* batch */, nil /* snapshot */)
     548            1 : }
     549              : 
     550              : type getIterAlloc struct {
     551              :         dbi    Iterator
     552              :         keyBuf []byte
     553              :         get    getIter
     554              : }
     555              : 
     556              : var getIterAllocPool = sync.Pool{
     557            1 :         New: func() interface{} {
     558            1 :                 return &getIterAlloc{}
     559            1 :         },
     560              : }
     561              : 
     562            1 : func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, error) {
     563            1 :         if err := d.closed.Load(); err != nil {
     564            0 :                 panic(err)
     565              :         }
     566              : 
     567              :         // Grab and reference the current readState. This prevents the underlying
     568              :         // files in the associated version from being deleted if there is a current
     569              :         // compaction. The readState is unref'd by Iterator.Close().
     570            1 :         readState := d.loadReadState()
     571            1 : 
     572            1 :         // Determine the seqnum to read at after grabbing the read state (current and
     573            1 :         // memtables) above.
     574            1 :         var seqNum base.SeqNum
     575            1 :         if s != nil {
     576            1 :                 seqNum = s.seqNum
     577            1 :         } else {
     578            1 :                 seqNum = d.mu.versions.visibleSeqNum.Load()
     579            1 :         }
     580              : 
     581            1 :         buf := getIterAllocPool.Get().(*getIterAlloc)
     582            1 : 
     583            1 :         get := &buf.get
     584            1 :         *get = getIter{
     585            1 :                 comparer: d.opts.Comparer,
     586            1 :                 newIters: d.newIters,
     587            1 :                 snapshot: seqNum,
     588            1 :                 iterOpts: IterOptions{
     589            1 :                         // TODO(sumeer): replace with a parameter provided by the caller.
     590            1 :                         Category:                      categoryGet,
     591            1 :                         logger:                        d.opts.Logger,
     592            1 :                         snapshotForHideObsoletePoints: seqNum,
     593            1 :                 },
     594            1 :                 key: key,
     595            1 :                 // Compute the key prefix for bloom filtering.
     596            1 :                 prefix:  key[:d.opts.Comparer.Split(key)],
     597            1 :                 batch:   b,
     598            1 :                 mem:     readState.memtables,
     599            1 :                 l0:      readState.current.L0SublevelFiles,
     600            1 :                 version: readState.current,
     601            1 :         }
     602            1 : 
     603            1 :         // Strip off memtables which cannot possibly contain the seqNum being read
     604            1 :         // at.
     605            1 :         for len(get.mem) > 0 {
     606            1 :                 n := len(get.mem)
     607            1 :                 if logSeqNum := get.mem[n-1].logSeqNum; logSeqNum < seqNum {
     608            1 :                         break
     609              :                 }
     610            1 :                 get.mem = get.mem[:n-1]
     611              :         }
     612              : 
     613            1 :         i := &buf.dbi
     614            1 :         pointIter := get
     615            1 :         *i = Iterator{
     616            1 :                 ctx:          context.Background(),
     617            1 :                 getIterAlloc: buf,
     618            1 :                 iter:         pointIter,
     619            1 :                 pointIter:    pointIter,
     620            1 :                 merge:        d.merge,
     621            1 :                 comparer:     *d.opts.Comparer,
     622            1 :                 readState:    readState,
     623            1 :                 keyBuf:       buf.keyBuf,
     624            1 :         }
     625            1 :         // Set up a blob value fetcher to use for retrieving values from blob files.
     626            1 :         i.blobValueFetcher.Init(&readState.current.BlobFiles, d.fileCache, block.NoReadEnv)
     627            1 :         get.iiopts.blobValueFetcher = &i.blobValueFetcher
     628            1 : 
     629            1 :         if !i.First() {
     630            1 :                 err := i.Close()
     631            1 :                 if err != nil {
     632            0 :                         return nil, nil, err
     633            0 :                 }
     634            1 :                 return nil, nil, ErrNotFound
     635              :         }
     636            1 :         return i.Value(), i, nil
     637              : }
     638              : 
     639              : // Set sets the value for the given key. It overwrites any previous value
     640              : // for that key; a DB is not a multi-map.
     641              : //
     642              : // It is safe to modify the contents of the arguments after Set returns.
     643            1 : func (d *DB) Set(key, value []byte, opts *WriteOptions) error {
     644            1 :         b := newBatch(d)
     645            1 :         _ = b.Set(key, value, opts)
     646            1 :         if err := d.Apply(b, opts); err != nil {
     647            0 :                 return err
     648            0 :         }
     649              :         // Only release the batch on success.
     650            1 :         return b.Close()
     651              : }
     652              : 
     653              : // Delete deletes the value for the given key. Deletes are blind all will
     654              : // succeed even if the given key does not exist.
     655              : //
     656              : // It is safe to modify the contents of the arguments after Delete returns.
     657            1 : func (d *DB) Delete(key []byte, opts *WriteOptions) error {
     658            1 :         b := newBatch(d)
     659            1 :         _ = b.Delete(key, opts)
     660            1 :         if err := d.Apply(b, opts); err != nil {
     661            0 :                 return err
     662            0 :         }
     663              :         // Only release the batch on success.
     664            1 :         return b.Close()
     665              : }
     666              : 
     667              : // DeleteSized behaves identically to Delete, but takes an additional
     668              : // argument indicating the size of the value being deleted. DeleteSized
     669              : // should be preferred when the caller has the expectation that there exists
     670              : // a single internal KV pair for the key (eg, the key has not been
     671              : // overwritten recently), and the caller knows the size of its value.
     672              : //
     673              : // DeleteSized will record the value size within the tombstone and use it to
     674              : // inform compaction-picking heuristics which strive to reduce space
     675              : // amplification in the LSM. This "calling your shot" mechanic allows the
     676              : // storage engine to more accurately estimate and reduce space amplification.
     677              : //
     678              : // It is safe to modify the contents of the arguments after DeleteSized
     679              : // returns.
     680            1 : func (d *DB) DeleteSized(key []byte, valueSize uint32, opts *WriteOptions) error {
     681            1 :         b := newBatch(d)
     682            1 :         _ = b.DeleteSized(key, valueSize, opts)
     683            1 :         if err := d.Apply(b, opts); err != nil {
     684            0 :                 return err
     685            0 :         }
     686              :         // Only release the batch on success.
     687            1 :         return b.Close()
     688              : }
     689              : 
     690              : // SingleDelete adds an action to the batch that single deletes the entry for key.
     691              : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
     692              : //
     693              : // WARNING: See the detailed warning in Writer.SingleDelete before using this.
     694              : //
     695              : // It is safe to modify the contents of the arguments after SingleDelete returns.
     696            1 : func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
     697            1 :         b := newBatch(d)
     698            1 :         _ = b.SingleDelete(key, opts)
     699            1 :         if err := d.Apply(b, opts); err != nil {
     700            0 :                 return err
     701            0 :         }
     702              :         // Only release the batch on success.
     703            1 :         return b.Close()
     704              : }
     705              : 
     706              : // DeleteRange deletes all of the keys (and values) in the range [start,end)
     707              : // (inclusive on start, exclusive on end).
     708              : //
     709              : // It is safe to modify the contents of the arguments after DeleteRange
     710              : // returns.
     711            1 : func (d *DB) DeleteRange(start, end []byte, opts *WriteOptions) error {
     712            1 :         b := newBatch(d)
     713            1 :         _ = b.DeleteRange(start, end, opts)
     714            1 :         if err := d.Apply(b, opts); err != nil {
     715            0 :                 return err
     716            0 :         }
     717              :         // Only release the batch on success.
     718            1 :         return b.Close()
     719              : }
     720              : 
     721              : // Merge adds an action to the DB that merges the value at key with the new
     722              : // value. The details of the merge are dependent upon the configured merge
     723              : // operator.
     724              : //
     725              : // It is safe to modify the contents of the arguments after Merge returns.
     726            1 : func (d *DB) Merge(key, value []byte, opts *WriteOptions) error {
     727            1 :         b := newBatch(d)
     728            1 :         _ = b.Merge(key, value, opts)
     729            1 :         if err := d.Apply(b, opts); err != nil {
     730            0 :                 return err
     731            0 :         }
     732              :         // Only release the batch on success.
     733            1 :         return b.Close()
     734              : }
     735              : 
     736              : // LogData adds the specified to the batch. The data will be written to the
     737              : // WAL, but not added to memtables or sstables. Log data is never indexed,
     738              : // which makes it useful for testing WAL performance.
     739              : //
     740              : // It is safe to modify the contents of the argument after LogData returns.
     741            1 : func (d *DB) LogData(data []byte, opts *WriteOptions) error {
     742            1 :         b := newBatch(d)
     743            1 :         _ = b.LogData(data, opts)
     744            1 :         if err := d.Apply(b, opts); err != nil {
     745            0 :                 return err
     746            0 :         }
     747              :         // Only release the batch on success.
     748            1 :         return b.Close()
     749              : }
     750              : 
     751              : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
     752              : // timestamp suffix to value. The suffix is optional. If any portion of the key
     753              : // range [start, end) is already set by a range key with the same suffix value,
     754              : // RangeKeySet overrides it.
     755              : //
     756              : // It is safe to modify the contents of the arguments after RangeKeySet returns.
     757            1 : func (d *DB) RangeKeySet(start, end, suffix, value []byte, opts *WriteOptions) error {
     758            1 :         b := newBatch(d)
     759            1 :         _ = b.RangeKeySet(start, end, suffix, value, opts)
     760            1 :         if err := d.Apply(b, opts); err != nil {
     761            0 :                 return err
     762            0 :         }
     763              :         // Only release the batch on success.
     764            1 :         return b.Close()
     765              : }
     766              : 
     767              : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
     768              : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
     769              : // range key. RangeKeyUnset only removes portions of range keys that fall within
     770              : // the [start, end) key span, and only range keys with suffixes that exactly
     771              : // match the unset suffix.
     772              : //
     773              : // It is safe to modify the contents of the arguments after RangeKeyUnset
     774              : // returns.
     775            1 : func (d *DB) RangeKeyUnset(start, end, suffix []byte, opts *WriteOptions) error {
     776            1 :         b := newBatch(d)
     777            1 :         _ = b.RangeKeyUnset(start, end, suffix, opts)
     778            1 :         if err := d.Apply(b, opts); err != nil {
     779            0 :                 return err
     780            0 :         }
     781              :         // Only release the batch on success.
     782            1 :         return b.Close()
     783              : }
     784              : 
     785              : // RangeKeyDelete deletes all of the range keys in the range [start,end)
     786              : // (inclusive on start, exclusive on end). It does not delete point keys (for
     787              : // that use DeleteRange). RangeKeyDelete removes all range keys within the
     788              : // bounds, including those with or without suffixes.
     789              : //
     790              : // It is safe to modify the contents of the arguments after RangeKeyDelete
     791              : // returns.
     792            1 : func (d *DB) RangeKeyDelete(start, end []byte, opts *WriteOptions) error {
     793            1 :         b := newBatch(d)
     794            1 :         _ = b.RangeKeyDelete(start, end, opts)
     795            1 :         if err := d.Apply(b, opts); err != nil {
     796            0 :                 return err
     797            0 :         }
     798              :         // Only release the batch on success.
     799            1 :         return b.Close()
     800              : }
     801              : 
     802              : // Apply the operations contained in the batch to the DB. If the batch is large
     803              : // the contents of the batch may be retained by the database. If that occurs
     804              : // the batch contents will be cleared preventing the caller from attempting to
     805              : // reuse them.
     806              : //
     807              : // It is safe to modify the contents of the arguments after Apply returns.
     808              : //
     809              : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
     810            1 : func (d *DB) Apply(batch *Batch, opts *WriteOptions) error {
     811            1 :         return d.applyInternal(batch, opts, false)
     812            1 : }
     813              : 
     814              : // ApplyNoSyncWait must only be used when opts.Sync is true and the caller
     815              : // does not want to wait for the WAL fsync to happen. The method will return
     816              : // once the mutation is applied to the memtable and is visible (note that a
     817              : // mutation is visible before the WAL sync even in the wait case, so we have
     818              : // not weakened the durability semantics). The caller must call Batch.SyncWait
     819              : // to wait for the WAL fsync. The caller must not Close the batch without
     820              : // first calling Batch.SyncWait.
     821              : //
     822              : // RECOMMENDATION: Prefer using Apply unless you really understand why you
     823              : // need ApplyNoSyncWait.
     824              : // EXPERIMENTAL: API/feature subject to change. Do not yet use outside
     825              : // CockroachDB.
     826            1 : func (d *DB) ApplyNoSyncWait(batch *Batch, opts *WriteOptions) error {
     827            1 :         if !opts.Sync {
     828            0 :                 return errors.Errorf("cannot request asynchonous apply when WriteOptions.Sync is false")
     829            0 :         }
     830            1 :         return d.applyInternal(batch, opts, true)
     831              : }
     832              : 
     833              : // REQUIRES: noSyncWait => opts.Sync
     834            1 : func (d *DB) applyInternal(batch *Batch, opts *WriteOptions, noSyncWait bool) error {
     835            1 :         if err := d.closed.Load(); err != nil {
     836            0 :                 panic(err)
     837              :         }
     838            1 :         if batch.committing {
     839            0 :                 panic("pebble: batch already committing")
     840              :         }
     841            1 :         if batch.applied.Load() {
     842            0 :                 panic("pebble: batch already applied")
     843              :         }
     844            1 :         if d.opts.ReadOnly {
     845            0 :                 return ErrReadOnly
     846            0 :         }
     847            1 :         if batch.db != nil && batch.db != d {
     848            0 :                 panic(fmt.Sprintf("pebble: batch db mismatch: %p != %p", batch.db, d))
     849              :         }
     850              : 
     851            1 :         sync := opts.GetSync()
     852            1 :         if sync && d.opts.DisableWAL {
     853            0 :                 return errors.New("pebble: WAL disabled")
     854            0 :         }
     855              : 
     856            1 :         if fmv := d.FormatMajorVersion(); fmv < batch.minimumFormatMajorVersion {
     857            0 :                 panic(fmt.Sprintf(
     858            0 :                         "pebble: batch requires at least format major version %d (current: %d)",
     859            0 :                         batch.minimumFormatMajorVersion, fmv,
     860            0 :                 ))
     861              :         }
     862              : 
     863            1 :         if batch.countRangeKeys > 0 {
     864            1 :                 if d.split == nil {
     865            0 :                         return errNoSplit
     866            0 :                 }
     867              :         }
     868            1 :         batch.committing = true
     869            1 : 
     870            1 :         if batch.db == nil {
     871            0 :                 if err := batch.refreshMemTableSize(); err != nil {
     872            0 :                         return err
     873            0 :                 }
     874              :         }
     875            1 :         if batch.memTableSize >= d.largeBatchThreshold {
     876            1 :                 var err error
     877            1 :                 batch.flushable, err = newFlushableBatch(batch, d.opts.Comparer)
     878            1 :                 if err != nil {
     879            0 :                         return err
     880            0 :                 }
     881              :         }
     882            1 :         if err := d.commit.Commit(batch, sync, noSyncWait); err != nil {
     883            0 :                 // There isn't much we can do on an error here. The commit pipeline will be
     884            0 :                 // horked at this point.
     885            0 :                 d.opts.Logger.Fatalf("pebble: fatal commit error: %v", err)
     886            0 :         }
     887              :         // If this is a large batch, we need to clear the batch contents as the
     888              :         // flushable batch may still be present in the flushables queue.
     889              :         //
     890              :         // TODO(peter): Currently large batches are written to the WAL. We could
     891              :         // skip the WAL write and instead wait for the large batch to be flushed to
     892              :         // an sstable. For a 100 MB batch, this might actually be faster. For a 1
     893              :         // GB batch this is almost certainly faster.
     894            1 :         if batch.flushable != nil {
     895            1 :                 batch.data = nil
     896            1 :         }
     897            1 :         return nil
     898              : }
     899              : 
     900            1 : func (d *DB) commitApply(b *Batch, mem *memTable) error {
     901            1 :         if b.flushable != nil {
     902            1 :                 // This is a large batch which was already added to the immutable queue.
     903            1 :                 return nil
     904            1 :         }
     905            1 :         err := mem.apply(b, b.SeqNum())
     906            1 :         if err != nil {
     907            0 :                 return err
     908            0 :         }
     909              : 
     910              :         // If the batch contains range tombstones and the database is configured
     911              :         // to flush range deletions, schedule a delayed flush so that disk space
     912              :         // may be reclaimed without additional writes or an explicit flush.
     913            1 :         if b.countRangeDels > 0 && d.opts.FlushDelayDeleteRange > 0 {
     914            1 :                 d.mu.Lock()
     915            1 :                 d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayDeleteRange)
     916            1 :                 d.mu.Unlock()
     917            1 :         }
     918              : 
     919              :         // If the batch contains range keys and the database is configured to flush
     920              :         // range keys, schedule a delayed flush so that the range keys are cleared
     921              :         // from the memtable.
     922            1 :         if b.countRangeKeys > 0 && d.opts.FlushDelayRangeKey > 0 {
     923            1 :                 d.mu.Lock()
     924            1 :                 d.maybeScheduleDelayedFlush(mem, d.opts.FlushDelayRangeKey)
     925            1 :                 d.mu.Unlock()
     926            1 :         }
     927              : 
     928            1 :         if mem.writerUnref() {
     929            1 :                 d.mu.Lock()
     930            1 :                 d.maybeScheduleFlush()
     931            1 :                 d.mu.Unlock()
     932            1 :         }
     933            1 :         return nil
     934              : }
     935              : 
     936            1 : func (d *DB) commitWrite(b *Batch, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
     937            1 :         var size int64
     938            1 :         repr := b.Repr()
     939            1 : 
     940            1 :         if b.flushable != nil {
     941            1 :                 // We have a large batch. Such batches are special in that they don't get
     942            1 :                 // added to the memtable, and are instead inserted into the queue of
     943            1 :                 // memtables. The call to makeRoomForWrite with this batch will force the
     944            1 :                 // current memtable to be flushed. We want the large batch to be part of
     945            1 :                 // the same log, so we add it to the WAL here, rather than after the call
     946            1 :                 // to makeRoomForWrite().
     947            1 :                 //
     948            1 :                 // Set the sequence number since it was not set to the correct value earlier
     949            1 :                 // (see comment in newFlushableBatch()).
     950            1 :                 b.flushable.setSeqNum(b.SeqNum())
     951            1 :                 if !d.opts.DisableWAL {
     952            1 :                         var err error
     953            1 :                         size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
     954            1 :                         if err != nil {
     955            0 :                                 panic(err)
     956              :                         }
     957              :                 }
     958              :         }
     959              : 
     960            1 :         var err error
     961            1 :         // Grab a reference to the memtable. We don't hold DB.mu, but we do hold
     962            1 :         // d.commit.mu. It's okay for readers of d.mu.mem.mutable to only hold one of
     963            1 :         // d.commit.mu or d.mu, because memtable rotations require holding both.
     964            1 :         mem := d.mu.mem.mutable
     965            1 :         // Batches which contain keys of kind InternalKeyKindIngestSST will
     966            1 :         // never be applied to the memtable, so we don't need to make room for
     967            1 :         // write.
     968            1 :         if !b.ingestedSSTBatch {
     969            1 :                 // Flushable batches will require a rotation of the memtable regardless,
     970            1 :                 // so only attempt an optimistic reservation of space in the current
     971            1 :                 // memtable if this batch is not a large flushable batch.
     972            1 :                 if b.flushable == nil {
     973            1 :                         err = d.mu.mem.mutable.prepare(b)
     974            1 :                 }
     975            1 :                 if b.flushable != nil || err == arenaskl.ErrArenaFull {
     976            1 :                         // Slow path.
     977            1 :                         // We need to acquire DB.mu and rotate the memtable.
     978            1 :                         func() {
     979            1 :                                 d.mu.Lock()
     980            1 :                                 defer d.mu.Unlock()
     981            1 :                                 err = d.makeRoomForWrite(b)
     982            1 :                                 mem = d.mu.mem.mutable
     983            1 :                         }()
     984              :                 }
     985              :         }
     986            1 :         if err != nil {
     987            0 :                 return nil, err
     988            0 :         }
     989            1 :         if d.opts.DisableWAL {
     990            1 :                 return mem, nil
     991            1 :         }
     992            1 :         d.logBytesIn.Add(uint64(len(repr)))
     993            1 : 
     994            1 :         if b.flushable == nil {
     995            1 :                 size, err = d.mu.log.writer.WriteRecord(repr, wal.SyncOptions{Done: syncWG, Err: syncErr}, b)
     996            1 :                 if err != nil {
     997            0 :                         panic(err)
     998              :                 }
     999              :         }
    1000              : 
    1001            1 :         d.logSize.Store(uint64(size))
    1002            1 :         return mem, err
    1003              : }
    1004              : 
    1005              : type iterAlloc struct {
    1006              :         dbi                 Iterator
    1007              :         keyBuf              []byte
    1008              :         boundsBuf           [2][]byte
    1009              :         prefixOrFullSeekKey []byte
    1010              :         merging             mergingIter
    1011              :         mlevels             [3 + numLevels]mergingIterLevel
    1012              :         levels              [3 + numLevels]levelIter
    1013              :         levelsPositioned    [3 + numLevels]bool
    1014              : }
    1015              : 
    1016              : var iterAllocPool = sync.Pool{
    1017            1 :         New: func() interface{} {
    1018            1 :                 return &iterAlloc{}
    1019            1 :         },
    1020              : }
    1021              : 
    1022              : // snapshotIterOpts denotes snapshot-related iterator options when calling
    1023              : // newIter. These are the possible cases for a snapshotIterOpts:
    1024              : //   - No snapshot: All fields are zero values.
    1025              : //   - Classic snapshot: Only `seqNum` is set. The latest readState will be used
    1026              : //     and the specified seqNum will be used as the snapshot seqNum.
    1027              : //   - EventuallyFileOnlySnapshot (EFOS) behaving as a classic snapshot. Only
    1028              : //     the `seqNum` is set. The latest readState will be used
    1029              : //     and the specified seqNum will be used as the snapshot seqNum.
    1030              : //   - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
    1031              : //     relevant SSTs are referenced by the *version.
    1032              : //   - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
    1033              : //     Only `seqNum` and `readState` are set.
    1034              : type snapshotIterOpts struct {
    1035              :         seqNum    base.SeqNum
    1036              :         vers      *manifest.Version
    1037              :         readState *readState
    1038              : }
    1039              : 
    1040              : type batchIterOpts struct {
    1041              :         batchOnly bool
    1042              : }
    1043              : type newIterOpts struct {
    1044              :         snapshot snapshotIterOpts
    1045              :         batch    batchIterOpts
    1046              : }
    1047              : 
    1048              : // newIter constructs a new iterator, merging in batch iterators as an extra
    1049              : // level.
    1050              : func (d *DB) newIter(
    1051              :         ctx context.Context, batch *Batch, newIterOpts newIterOpts, o *IterOptions,
    1052            1 : ) *Iterator {
    1053            1 :         if newIterOpts.batch.batchOnly {
    1054            0 :                 if batch == nil {
    1055            0 :                         panic("batchOnly is true, but batch is nil")
    1056              :                 }
    1057            0 :                 if newIterOpts.snapshot.vers != nil {
    1058            0 :                         panic("batchOnly is true, but snapshotIterOpts is initialized")
    1059              :                 }
    1060              :         }
    1061            1 :         if err := d.closed.Load(); err != nil {
    1062            0 :                 panic(err)
    1063              :         }
    1064            1 :         seqNum := newIterOpts.snapshot.seqNum
    1065            1 :         if o != nil && o.RangeKeyMasking.Suffix != nil && o.KeyTypes != IterKeyTypePointsAndRanges {
    1066            0 :                 panic("pebble: range key masking requires IterKeyTypePointsAndRanges")
    1067              :         }
    1068            1 :         if (batch != nil || seqNum != 0) && (o != nil && o.OnlyReadGuaranteedDurable) {
    1069            0 :                 // We could add support for OnlyReadGuaranteedDurable on snapshots if
    1070            0 :                 // there was a need: this would require checking that the sequence number
    1071            0 :                 // of the snapshot has been flushed, by comparing with
    1072            0 :                 // DB.mem.queue[0].logSeqNum.
    1073            0 :                 panic("OnlyReadGuaranteedDurable is not supported for batches or snapshots")
    1074              :         }
    1075            1 :         var readState *readState
    1076            1 :         var newIters tableNewIters
    1077            1 :         var newIterRangeKey keyspanimpl.TableNewSpanIter
    1078            1 :         if !newIterOpts.batch.batchOnly {
    1079            1 :                 // Grab and reference the current readState. This prevents the underlying
    1080            1 :                 // files in the associated version from being deleted if there is a current
    1081            1 :                 // compaction. The readState is unref'd by Iterator.Close().
    1082            1 :                 if newIterOpts.snapshot.vers == nil {
    1083            1 :                         if newIterOpts.snapshot.readState != nil {
    1084            0 :                                 readState = newIterOpts.snapshot.readState
    1085            0 :                                 readState.ref()
    1086            1 :                         } else {
    1087            1 :                                 // NB: loadReadState() calls readState.ref().
    1088            1 :                                 readState = d.loadReadState()
    1089            1 :                         }
    1090            1 :                 } else {
    1091            1 :                         // vers != nil
    1092            1 :                         newIterOpts.snapshot.vers.Ref()
    1093            1 :                 }
    1094              : 
    1095              :                 // Determine the seqnum to read at after grabbing the read state (current and
    1096              :                 // memtables) above.
    1097            1 :                 if seqNum == 0 {
    1098            1 :                         seqNum = d.mu.versions.visibleSeqNum.Load()
    1099            1 :                 }
    1100            1 :                 newIters = d.newIters
    1101            1 :                 newIterRangeKey = d.tableNewRangeKeyIter
    1102              :         }
    1103              : 
    1104              :         // Bundle various structures under a single umbrella in order to allocate
    1105              :         // them together.
    1106            1 :         buf := iterAllocPool.Get().(*iterAlloc)
    1107            1 :         dbi := &buf.dbi
    1108            1 :         *dbi = Iterator{
    1109            1 :                 ctx:                 ctx,
    1110            1 :                 alloc:               buf,
    1111            1 :                 merge:               d.merge,
    1112            1 :                 comparer:            *d.opts.Comparer,
    1113            1 :                 readState:           readState,
    1114            1 :                 version:             newIterOpts.snapshot.vers,
    1115            1 :                 keyBuf:              buf.keyBuf,
    1116            1 :                 prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
    1117            1 :                 boundsBuf:           buf.boundsBuf,
    1118            1 :                 batch:               batch,
    1119            1 :                 fc:                  d.fileCache,
    1120            1 :                 newIters:            newIters,
    1121            1 :                 newIterRangeKey:     newIterRangeKey,
    1122            1 :                 seqNum:              seqNum,
    1123            1 :                 batchOnlyIter:       newIterOpts.batch.batchOnly,
    1124            1 :         }
    1125            1 :         if o != nil {
    1126            1 :                 dbi.opts = *o
    1127            1 :                 dbi.processBounds(o.LowerBound, o.UpperBound)
    1128            1 :         }
    1129            1 :         dbi.opts.logger = d.opts.Logger
    1130            1 :         if d.opts.private.disableLazyCombinedIteration {
    1131            1 :                 dbi.opts.disableLazyCombinedIteration = true
    1132            1 :         }
    1133            1 :         if batch != nil {
    1134            1 :                 dbi.batchSeqNum = dbi.batch.nextSeqNum()
    1135            1 :         }
    1136            1 :         return finishInitializingIter(ctx, buf)
    1137              : }
    1138              : 
    1139              : // finishInitializingIter is a helper for doing the non-trivial initialization
    1140              : // of an Iterator. It's invoked to perform the initial initialization of an
    1141              : // Iterator during NewIter or Clone, and to perform reinitialization due to a
    1142              : // change in IterOptions by a call to Iterator.SetOptions.
    1143            1 : func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
    1144            1 :         // Short-hand.
    1145            1 :         dbi := &buf.dbi
    1146            1 :         var memtables flushableList
    1147            1 :         if dbi.readState != nil {
    1148            1 :                 memtables = dbi.readState.memtables
    1149            1 :         }
    1150            1 :         if dbi.opts.OnlyReadGuaranteedDurable {
    1151            0 :                 memtables = nil
    1152            1 :         } else {
    1153            1 :                 // We only need to read from memtables which contain sequence numbers older
    1154            1 :                 // than seqNum. Trim off newer memtables.
    1155            1 :                 for i := len(memtables) - 1; i >= 0; i-- {
    1156            1 :                         if logSeqNum := memtables[i].logSeqNum; logSeqNum < dbi.seqNum {
    1157            1 :                                 break
    1158              :                         }
    1159            1 :                         memtables = memtables[:i]
    1160              :                 }
    1161              :         }
    1162              : 
    1163            1 :         if dbi.opts.pointKeys() {
    1164            1 :                 // Construct the point iterator, initializing dbi.pointIter to point to
    1165            1 :                 // dbi.merging. If this is called during a SetOptions call and this
    1166            1 :                 // Iterator has already initialized dbi.merging, constructPointIter is a
    1167            1 :                 // noop and an initialized pointIter already exists in dbi.pointIter.
    1168            1 :                 dbi.constructPointIter(ctx, memtables, buf)
    1169            1 :                 dbi.iter = dbi.pointIter
    1170            1 :         } else {
    1171            1 :                 dbi.iter = emptyIter
    1172            1 :         }
    1173              : 
    1174            1 :         if dbi.opts.rangeKeys() {
    1175            1 :                 dbi.rangeKeyMasking.init(dbi, &dbi.comparer)
    1176            1 : 
    1177            1 :                 // When iterating over both point and range keys, don't create the
    1178            1 :                 // range-key iterator stack immediately if we can avoid it. This
    1179            1 :                 // optimization takes advantage of the expected sparseness of range
    1180            1 :                 // keys, and configures the point-key iterator to dynamically switch to
    1181            1 :                 // combined iteration when it observes a file containing range keys.
    1182            1 :                 //
    1183            1 :                 // Lazy combined iteration is not possible if a batch or a memtable
    1184            1 :                 // contains any range keys.
    1185            1 :                 useLazyCombinedIteration := dbi.rangeKey == nil &&
    1186            1 :                         dbi.opts.KeyTypes == IterKeyTypePointsAndRanges &&
    1187            1 :                         (dbi.batch == nil || dbi.batch.countRangeKeys == 0) &&
    1188            1 :                         !dbi.opts.disableLazyCombinedIteration
    1189            1 :                 if useLazyCombinedIteration {
    1190            1 :                         // The user requested combined iteration, and there's no indexed
    1191            1 :                         // batch currently containing range keys that would prevent lazy
    1192            1 :                         // combined iteration. Check the memtables to see if they contain
    1193            1 :                         // any range keys.
    1194            1 :                         for i := range memtables {
    1195            1 :                                 if memtables[i].containsRangeKeys() {
    1196            1 :                                         useLazyCombinedIteration = false
    1197            1 :                                         break
    1198              :                                 }
    1199              :                         }
    1200              :                 }
    1201              : 
    1202            1 :                 if useLazyCombinedIteration {
    1203            1 :                         dbi.lazyCombinedIter = lazyCombinedIter{
    1204            1 :                                 parent:    dbi,
    1205            1 :                                 pointIter: dbi.pointIter,
    1206            1 :                                 combinedIterState: combinedIterState{
    1207            1 :                                         initialized: false,
    1208            1 :                                 },
    1209            1 :                         }
    1210            1 :                         dbi.iter = &dbi.lazyCombinedIter
    1211            1 :                         dbi.iter = invalidating.MaybeWrapIfInvariants(dbi.iter)
    1212            1 :                 } else {
    1213            1 :                         dbi.lazyCombinedIter.combinedIterState = combinedIterState{
    1214            1 :                                 initialized: true,
    1215            1 :                         }
    1216            1 :                         if dbi.rangeKey == nil {
    1217            1 :                                 dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
    1218            1 :                                 dbi.constructRangeKeyIter()
    1219            1 :                         } else {
    1220            1 :                                 dbi.rangeKey.iterConfig.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound)
    1221            1 :                         }
    1222              : 
    1223              :                         // Wrap the point iterator (currently dbi.iter) with an interleaving
    1224              :                         // iterator that interleaves range keys pulled from
    1225              :                         // dbi.rangeKey.rangeKeyIter.
    1226              :                         //
    1227              :                         // NB: The interleaving iterator is always reinitialized, even if
    1228              :                         // dbi already had an initialized range key iterator, in case the point
    1229              :                         // iterator changed or the range key masking suffix changed.
    1230            1 :                         dbi.rangeKey.iiter.Init(&dbi.comparer, dbi.iter, dbi.rangeKey.rangeKeyIter,
    1231            1 :                                 keyspan.InterleavingIterOpts{
    1232            1 :                                         Mask:       &dbi.rangeKeyMasking,
    1233            1 :                                         LowerBound: dbi.opts.LowerBound,
    1234            1 :                                         UpperBound: dbi.opts.UpperBound,
    1235            1 :                                 })
    1236            1 :                         dbi.iter = &dbi.rangeKey.iiter
    1237              :                 }
    1238            1 :         } else {
    1239            1 :                 // !dbi.opts.rangeKeys()
    1240            1 :                 //
    1241            1 :                 // Reset the combined iterator state. The initialized=true ensures the
    1242            1 :                 // iterator doesn't unnecessarily try to switch to combined iteration.
    1243            1 :                 dbi.lazyCombinedIter.combinedIterState = combinedIterState{initialized: true}
    1244            1 :         }
    1245            1 :         return dbi
    1246              : }
    1247              : 
    1248              : // ScanInternal scans all internal keys within the specified bounds, truncating
    1249              : // any rangedels and rangekeys to those bounds if they span past them. For use
    1250              : // when an external user needs to be aware of all internal keys that make up a
    1251              : // key range.
    1252              : //
    1253              : // Keys deleted by range deletions must not be returned or exposed by this
    1254              : // method, while the range deletion deleting that key must be exposed using
    1255              : // visitRangeDel. Keys that would be masked by range key masking (if an
    1256              : // appropriate prefix were set) should be exposed, alongside the range key
    1257              : // that would have masked it. This method also collapses all point keys into
    1258              : // one InternalKey; so only one internal key at most per user key is returned
    1259              : // to visitPointKey.
    1260              : //
    1261              : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
    1262              : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
    1263              : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
    1264              : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
    1265              : // sstable in L5 or L6 is found that is not in shared storage according to
    1266              : // provider.IsShared, or an sstable in those levels contains a newer key than the
    1267              : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
    1268              : // of when this could happen could be if Pebble started writing sstables before a
    1269              : // creator ID was set (as creator IDs are necessary to enable shared storage)
    1270              : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
    1271              : // iteration is invalid in those cases.
    1272              : func (d *DB) ScanInternal(
    1273              :         ctx context.Context,
    1274              :         category block.Category,
    1275              :         lower, upper []byte,
    1276              :         visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
    1277              :         visitRangeDel func(start, end []byte, seqNum SeqNum) error,
    1278              :         visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
    1279              :         visitSharedFile func(sst *SharedSSTMeta) error,
    1280              :         visitExternalFile func(sst *ExternalFile) error,
    1281            1 : ) error {
    1282            1 :         scanInternalOpts := &scanInternalOptions{
    1283            1 :                 category:          category,
    1284            1 :                 visitPointKey:     visitPointKey,
    1285            1 :                 visitRangeDel:     visitRangeDel,
    1286            1 :                 visitRangeKey:     visitRangeKey,
    1287            1 :                 visitSharedFile:   visitSharedFile,
    1288            1 :                 visitExternalFile: visitExternalFile,
    1289            1 :                 IterOptions: IterOptions{
    1290            1 :                         KeyTypes:   IterKeyTypePointsAndRanges,
    1291            1 :                         LowerBound: lower,
    1292            1 :                         UpperBound: upper,
    1293            1 :                 },
    1294            1 :         }
    1295            1 :         iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, scanInternalOpts)
    1296            1 :         if err != nil {
    1297            0 :                 return err
    1298            0 :         }
    1299            1 :         defer iter.close()
    1300            1 :         return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
    1301              : }
    1302              : 
    1303              : // newInternalIter constructs and returns a new scanInternalIterator on this db.
    1304              : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
    1305              : // to the internal iterator.
    1306              : //
    1307              : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
    1308              : // finishInitializingIter. Both pairs of methods should be refactored to reduce
    1309              : // this duplication.
    1310              : func (d *DB) newInternalIter(
    1311              :         ctx context.Context, sOpts snapshotIterOpts, o *scanInternalOptions,
    1312            1 : ) (*scanInternalIterator, error) {
    1313            1 :         if err := d.closed.Load(); err != nil {
    1314            0 :                 panic(err)
    1315              :         }
    1316              :         // Grab and reference the current readState. This prevents the underlying
    1317              :         // files in the associated version from being deleted if there is a current
    1318              :         // compaction. The readState is unref'd by Iterator.Close().
    1319            1 :         var readState *readState
    1320            1 :         var vers *manifest.Version
    1321            1 :         if sOpts.vers == nil {
    1322            1 :                 if sOpts.readState != nil {
    1323            0 :                         readState = sOpts.readState
    1324            0 :                         readState.ref()
    1325            0 :                         vers = readState.current
    1326            1 :                 } else {
    1327            1 :                         readState = d.loadReadState()
    1328            1 :                         vers = readState.current
    1329            1 :                 }
    1330            0 :         } else {
    1331            0 :                 vers = sOpts.vers
    1332            0 :                 sOpts.vers.Ref()
    1333            0 :         }
    1334              : 
    1335              :         // Determine the seqnum to read at after grabbing the read state (current and
    1336              :         // memtables) above.
    1337            1 :         seqNum := sOpts.seqNum
    1338            1 :         if seqNum == 0 {
    1339            1 :                 seqNum = d.mu.versions.visibleSeqNum.Load()
    1340            1 :         }
    1341              : 
    1342              :         // Bundle various structures under a single umbrella in order to allocate
    1343              :         // them together.
    1344            1 :         buf := iterAllocPool.Get().(*iterAlloc)
    1345            1 :         dbi := &scanInternalIterator{
    1346            1 :                 ctx:             ctx,
    1347            1 :                 db:              d,
    1348            1 :                 comparer:        d.opts.Comparer,
    1349            1 :                 merge:           d.opts.Merger.Merge,
    1350            1 :                 readState:       readState,
    1351            1 :                 version:         sOpts.vers,
    1352            1 :                 alloc:           buf,
    1353            1 :                 newIters:        d.newIters,
    1354            1 :                 newIterRangeKey: d.tableNewRangeKeyIter,
    1355            1 :                 seqNum:          seqNum,
    1356            1 :                 mergingIter:     &buf.merging,
    1357            1 :         }
    1358            1 :         dbi.blobValueFetcher.Init(&vers.BlobFiles, d.fileCache, block.ReadEnv{})
    1359            1 : 
    1360            1 :         dbi.opts = *o
    1361            1 :         dbi.opts.logger = d.opts.Logger
    1362            1 :         if d.opts.private.disableLazyCombinedIteration {
    1363            1 :                 dbi.opts.disableLazyCombinedIteration = true
    1364            1 :         }
    1365            1 :         return finishInitializingInternalIter(buf, dbi)
    1366              : }
    1367              : 
    1368              : type internalIterOpts struct {
    1369              :         // if compaction is set, sstable-level iterators will be created using
    1370              :         // NewCompactionIter; these iterators have a more constrained interface
    1371              :         // and are optimized for the sequential scan of a compaction.
    1372              :         compaction         bool
    1373              :         readEnv            sstable.ReadEnv
    1374              :         boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
    1375              :         // blobValueFetcher is the base.ValueFetcher to use when constructing
    1376              :         // internal values to represent values stored externally in blob files.
    1377              :         blobValueFetcher base.ValueFetcher
    1378              : }
    1379              : 
    1380              : func finishInitializingInternalIter(
    1381              :         buf *iterAlloc, i *scanInternalIterator,
    1382            1 : ) (*scanInternalIterator, error) {
    1383            1 :         // Short-hand.
    1384            1 :         var memtables flushableList
    1385            1 :         if i.readState != nil {
    1386            1 :                 memtables = i.readState.memtables
    1387            1 :         }
    1388              :         // We only need to read from memtables which contain sequence numbers older
    1389              :         // than seqNum. Trim off newer memtables.
    1390            1 :         for j := len(memtables) - 1; j >= 0; j-- {
    1391            1 :                 if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
    1392            1 :                         break
    1393              :                 }
    1394            1 :                 memtables = memtables[:j]
    1395              :         }
    1396            1 :         i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
    1397            1 : 
    1398            1 :         if err := i.constructPointIter(i.opts.category, memtables, buf); err != nil {
    1399            0 :                 return nil, err
    1400            0 :         }
    1401              : 
    1402              :         // For internal iterators, we skip the lazy combined iteration optimization
    1403              :         // entirely, and create the range key iterator stack directly.
    1404            1 :         i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
    1405            1 :         if err := i.constructRangeKeyIter(); err != nil {
    1406            0 :                 return nil, err
    1407            0 :         }
    1408              : 
    1409              :         // Wrap the point iterator (currently i.iter) with an interleaving
    1410              :         // iterator that interleaves range keys pulled from
    1411              :         // i.rangeKey.rangeKeyIter.
    1412            1 :         i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
    1413            1 :                 keyspan.InterleavingIterOpts{
    1414            1 :                         LowerBound: i.opts.LowerBound,
    1415            1 :                         UpperBound: i.opts.UpperBound,
    1416            1 :                 })
    1417            1 :         i.iter = &i.rangeKey.iiter
    1418            1 : 
    1419            1 :         return i, nil
    1420              : }
    1421              : 
    1422              : func (i *Iterator) constructPointIter(
    1423              :         ctx context.Context, memtables flushableList, buf *iterAlloc,
    1424            1 : ) {
    1425            1 :         if i.pointIter != nil {
    1426            1 :                 // Already have one.
    1427            1 :                 return
    1428            1 :         }
    1429            1 :         readEnv := block.ReadEnv{
    1430            1 :                 Stats: &i.stats.InternalStats,
    1431            1 :                 // If the file cache has a sstable stats collector, ask it for an
    1432            1 :                 // accumulator for this iterator's configured category and QoS. All SSTable
    1433            1 :                 // iterators created by this Iterator will accumulate their stats to it as
    1434            1 :                 // they Close during iteration.
    1435            1 :                 IterStats: i.fc.SSTStatsCollector().Accumulator(
    1436            1 :                         uint64(uintptr(unsafe.Pointer(i))),
    1437            1 :                         i.opts.Category,
    1438            1 :                 ),
    1439            1 :         }
    1440            1 :         if i.readState != nil {
    1441            1 :                 i.blobValueFetcher.Init(&i.readState.current.BlobFiles, i.fc, readEnv)
    1442            1 :         } else if i.version != nil {
    1443            1 :                 i.blobValueFetcher.Init(&i.version.BlobFiles, i.fc, readEnv)
    1444            1 :         }
    1445            1 :         internalOpts := internalIterOpts{
    1446            1 :                 readEnv:          sstable.ReadEnv{Block: readEnv},
    1447            1 :                 blobValueFetcher: &i.blobValueFetcher,
    1448            1 :         }
    1449            1 :         if i.opts.RangeKeyMasking.Filter != nil {
    1450            1 :                 internalOpts.boundLimitedFilter = &i.rangeKeyMasking
    1451            1 :         }
    1452              : 
    1453              :         // Merging levels and levels from iterAlloc.
    1454            1 :         mlevels := buf.mlevels[:0]
    1455            1 :         levels := buf.levels[:0]
    1456            1 : 
    1457            1 :         // We compute the number of levels needed ahead of time and reallocate a slice if
    1458            1 :         // the array from the iterAlloc isn't large enough. Doing this allocation once
    1459            1 :         // should improve the performance.
    1460            1 :         numMergingLevels := 0
    1461            1 :         numLevelIters := 0
    1462            1 :         if i.batch != nil {
    1463            1 :                 numMergingLevels++
    1464            1 :         }
    1465              : 
    1466            1 :         var current *manifest.Version
    1467            1 :         if !i.batchOnlyIter {
    1468            1 :                 numMergingLevels += len(memtables)
    1469            1 : 
    1470            1 :                 current = i.version
    1471            1 :                 if current == nil {
    1472            1 :                         current = i.readState.current
    1473            1 :                 }
    1474            1 :                 numMergingLevels += len(current.L0SublevelFiles)
    1475            1 :                 numLevelIters += len(current.L0SublevelFiles)
    1476            1 :                 for level := 1; level < len(current.Levels); level++ {
    1477            1 :                         if current.Levels[level].Empty() {
    1478            1 :                                 continue
    1479              :                         }
    1480            1 :                         numMergingLevels++
    1481            1 :                         numLevelIters++
    1482              :                 }
    1483              :         }
    1484              : 
    1485            1 :         if numMergingLevels > cap(mlevels) {
    1486            1 :                 mlevels = make([]mergingIterLevel, 0, numMergingLevels)
    1487            1 :         }
    1488            1 :         if numLevelIters > cap(levels) {
    1489            1 :                 levels = make([]levelIter, 0, numLevelIters)
    1490            1 :         }
    1491              : 
    1492              :         // Top-level is the batch, if any.
    1493            1 :         if i.batch != nil {
    1494            1 :                 if i.batch.index == nil {
    1495            0 :                         // This isn't an indexed batch. We shouldn't have gotten this far.
    1496            0 :                         panic(errors.AssertionFailedf("creating an iterator over an unindexed batch"))
    1497            1 :                 } else {
    1498            1 :                         i.batch.initInternalIter(&i.opts, &i.batchPointIter)
    1499            1 :                         i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
    1500            1 :                         // Only include the batch's rangedel iterator if it's non-empty.
    1501            1 :                         // This requires some subtle logic in the case a rangedel is later
    1502            1 :                         // written to the batch and the view of the batch is refreshed
    1503            1 :                         // during a call to SetOptions—in this case, we need to reconstruct
    1504            1 :                         // the point iterator to add the batch rangedel iterator.
    1505            1 :                         var rangeDelIter keyspan.FragmentIterator
    1506            1 :                         if i.batchRangeDelIter.Count() > 0 {
    1507            0 :                                 rangeDelIter = &i.batchRangeDelIter
    1508            0 :                         }
    1509            1 :                         mlevels = append(mlevels, mergingIterLevel{
    1510            1 :                                 iter:         &i.batchPointIter,
    1511            1 :                                 rangeDelIter: rangeDelIter,
    1512            1 :                         })
    1513              :                 }
    1514              :         }
    1515              : 
    1516            1 :         if !i.batchOnlyIter {
    1517            1 :                 // Next are the memtables.
    1518            1 :                 for j := len(memtables) - 1; j >= 0; j-- {
    1519            1 :                         mem := memtables[j]
    1520            1 :                         mlevels = append(mlevels, mergingIterLevel{
    1521            1 :                                 iter:         mem.newIter(&i.opts),
    1522            1 :                                 rangeDelIter: mem.newRangeDelIter(&i.opts),
    1523            1 :                         })
    1524            1 :                 }
    1525              : 
    1526              :                 // Next are the file levels: L0 sub-levels followed by lower levels.
    1527            1 :                 mlevelsIndex := len(mlevels)
    1528            1 :                 levelsIndex := len(levels)
    1529            1 :                 mlevels = mlevels[:numMergingLevels]
    1530            1 :                 levels = levels[:numLevelIters]
    1531            1 :                 i.opts.snapshotForHideObsoletePoints = buf.dbi.seqNum
    1532            1 :                 addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
    1533            1 :                         li := &levels[levelsIndex]
    1534            1 : 
    1535            1 :                         li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
    1536            1 :                         li.initRangeDel(&mlevels[mlevelsIndex])
    1537            1 :                         li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
    1538            1 :                         mlevels[mlevelsIndex].levelIter = li
    1539            1 :                         mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
    1540            1 : 
    1541            1 :                         levelsIndex++
    1542            1 :                         mlevelsIndex++
    1543            1 :                 }
    1544              : 
    1545              :                 // Add level iterators for the L0 sublevels, iterating from newest to
    1546              :                 // oldest.
    1547            1 :                 for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
    1548            1 :                         addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i))
    1549            1 :                 }
    1550              : 
    1551              :                 // Add level iterators for the non-empty non-L0 levels.
    1552            1 :                 for level := 1; level < len(current.Levels); level++ {
    1553            1 :                         if current.Levels[level].Empty() {
    1554            1 :                                 continue
    1555              :                         }
    1556            1 :                         addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
    1557              :                 }
    1558              :         }
    1559            1 :         buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
    1560            1 :         if len(mlevels) <= cap(buf.levelsPositioned) {
    1561            1 :                 buf.merging.levelsPositioned = buf.levelsPositioned[:len(mlevels)]
    1562            1 :         }
    1563            1 :         buf.merging.snapshot = i.seqNum
    1564            1 :         buf.merging.batchSnapshot = i.batchSeqNum
    1565            1 :         buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
    1566            1 :         i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging).(topLevelIterator)
    1567            1 :         i.merging = &buf.merging
    1568              : }
    1569              : 
    1570              : // NewBatch returns a new empty write-only batch. Any reads on the batch will
    1571              : // return an error. If the batch is committed it will be applied to the DB.
    1572            1 : func (d *DB) NewBatch(opts ...BatchOption) *Batch {
    1573            1 :         return newBatch(d, opts...)
    1574            1 : }
    1575              : 
    1576              : // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
    1577              : // the specified memory space for the internal slice in advance.
    1578            0 : func (d *DB) NewBatchWithSize(size int, opts ...BatchOption) *Batch {
    1579            0 :         return newBatchWithSize(d, size, opts...)
    1580            0 : }
    1581              : 
    1582              : // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
    1583              : // will read from both the batch and the DB. If the batch is committed it will
    1584              : // be applied to the DB. An indexed batch is slower that a non-indexed batch
    1585              : // for insert operations. If you do not need to perform reads on the batch, use
    1586              : // NewBatch instead.
    1587            1 : func (d *DB) NewIndexedBatch() *Batch {
    1588            1 :         return newIndexedBatch(d, d.opts.Comparer)
    1589            1 : }
    1590              : 
    1591              : // NewIndexedBatchWithSize is mostly identical to NewIndexedBatch, but it will
    1592              : // allocate the specified memory space for the internal slice in advance.
    1593            0 : func (d *DB) NewIndexedBatchWithSize(size int) *Batch {
    1594            0 :         return newIndexedBatchWithSize(d, d.opts.Comparer, size)
    1595            0 : }
    1596              : 
    1597              : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
    1598              : // return false). The iterator can be positioned via a call to SeekGE, SeekLT,
    1599              : // First or Last. The iterator provides a point-in-time view of the current DB
    1600              : // state. This view is maintained by preventing file deletions and preventing
    1601              : // memtables referenced by the iterator from being deleted. Using an iterator
    1602              : // to maintain a long-lived point-in-time view of the DB state can lead to an
    1603              : // apparent memory and disk usage leak. Use snapshots (see NewSnapshot) for
    1604              : // point-in-time snapshots which avoids these problems.
    1605            1 : func (d *DB) NewIter(o *IterOptions) (*Iterator, error) {
    1606            1 :         return d.NewIterWithContext(context.Background(), o)
    1607            1 : }
    1608              : 
    1609              : // NewIterWithContext is like NewIter, and additionally accepts a context for
    1610              : // tracing.
    1611            1 : func (d *DB) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
    1612            1 :         return d.newIter(ctx, nil /* batch */, newIterOpts{}, o), nil
    1613            1 : }
    1614              : 
    1615              : // NewSnapshot returns a point-in-time view of the current DB state. Iterators
    1616              : // created with this handle will all observe a stable snapshot of the current
    1617              : // DB state. The caller must call Snapshot.Close() when the snapshot is no
    1618              : // longer needed. Snapshots are not persisted across DB restarts (close ->
    1619              : // open). Unlike the implicit snapshot maintained by an iterator, a snapshot
    1620              : // will not prevent memtables from being released or sstables from being
    1621              : // deleted. Instead, a snapshot prevents deletion of sequence numbers
    1622              : // referenced by the snapshot.
    1623              : //
    1624              : // There exists one violation of a Snapshot's point-in-time guarantee: An excise
    1625              : // (see DB.Excise and DB.IngestAndExcise) that occurs after the snapshot's
    1626              : // creation will be observed by iterators created from the snapshot after the
    1627              : // excise. See NewEventuallyFileOnlySnapshot for a variant of NewSnapshot that
    1628              : // provides a full point-in-time guarantee.
    1629            1 : func (d *DB) NewSnapshot() *Snapshot {
    1630            1 :         // TODO(jackson): Consider removal of regular, non-eventually-file-only
    1631            1 :         // snapshots given they no longer provide a true point-in-time snapshot of
    1632            1 :         // the database due to excises. If we had a mechanism to construct a maximal
    1633            1 :         // key range, we could implement NewSnapshot in terms of
    1634            1 :         // NewEventuallyFileOnlySnapshot and provide a true point-in-time guarantee.
    1635            1 :         if err := d.closed.Load(); err != nil {
    1636            0 :                 panic(err)
    1637              :         }
    1638            1 :         d.mu.Lock()
    1639            1 :         s := &Snapshot{
    1640            1 :                 db:     d,
    1641            1 :                 seqNum: d.mu.versions.visibleSeqNum.Load(),
    1642            1 :         }
    1643            1 :         d.mu.snapshots.pushBack(s)
    1644            1 :         d.mu.Unlock()
    1645            1 :         return s
    1646              : }
    1647              : 
    1648              : // NewEventuallyFileOnlySnapshot returns a point-in-time view of the current DB
    1649              : // state, similar to NewSnapshot, but with consistency constrained to the
    1650              : // provided set of key ranges. See the comment at EventuallyFileOnlySnapshot for
    1651              : // its semantics.
    1652            1 : func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFileOnlySnapshot {
    1653            1 :         if err := d.closed.Load(); err != nil {
    1654            0 :                 panic(err)
    1655              :         }
    1656            1 :         for i := range keyRanges {
    1657            1 :                 if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
    1658            0 :                         panic("pebble: key ranges for eventually-file-only-snapshot not in order")
    1659              :                 }
    1660              :         }
    1661            1 :         return d.makeEventuallyFileOnlySnapshot(keyRanges)
    1662              : }
    1663              : 
    1664              : // Close closes the DB.
    1665              : //
    1666              : // It is not safe to close a DB until all outstanding iterators are closed
    1667              : // or to call Close concurrently with any other DB method. It is not valid
    1668              : // to call any of a DB's methods after the DB has been closed.
    1669            1 : func (d *DB) Close() error {
    1670            1 :         if err := d.closed.Load(); err != nil {
    1671            0 :                 panic(err)
    1672              :         }
    1673            1 :         d.compactionSchedulers.Wait()
    1674            1 :         // Compactions can be asynchronously started by the CompactionScheduler
    1675            1 :         // calling d.Schedule. When this Unregister returns, we know that the
    1676            1 :         // CompactionScheduler will never again call a method on the DB. Note that
    1677            1 :         // this must be called without holding d.mu.
    1678            1 :         d.opts.Experimental.CompactionScheduler.Unregister()
    1679            1 :         // Lock the commit pipeline for the duration of Close. This prevents a race
    1680            1 :         // with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
    1681            1 :         // dropping d.mu several times for I/O. If Close only holds d.mu, an
    1682            1 :         // in-progress WAL rotation may re-acquire d.mu only once the database is
    1683            1 :         // closed.
    1684            1 :         //
    1685            1 :         // Additionally, locking the commit pipeline makes it more likely that
    1686            1 :         // (illegal) concurrent writes will observe d.closed.Load() != nil, creating
    1687            1 :         // more understable panics if the database is improperly used concurrently
    1688            1 :         // during Close.
    1689            1 :         d.commit.mu.Lock()
    1690            1 :         defer d.commit.mu.Unlock()
    1691            1 :         d.mu.Lock()
    1692            1 :         defer d.mu.Unlock()
    1693            1 :         // Check that the DB is not closed again. If there are two concurrent calls
    1694            1 :         // to DB.Close, the best-effort check at the top of DB.Close may not fire.
    1695            1 :         // But since this second check happens after mutex acquisition, the two
    1696            1 :         // concurrent calls will get serialized and the second one will see the
    1697            1 :         // effect of the d.closed.Store below.
    1698            1 :         if err := d.closed.Load(); err != nil {
    1699            0 :                 panic(err)
    1700              :         }
    1701              :         // Clear the finalizer that is used to check that an unreferenced DB has been
    1702              :         // closed. We're closing the DB here, so the check performed by that
    1703              :         // finalizer isn't necessary.
    1704              :         //
    1705              :         // Note: this is a no-op if invariants are disabled or race is enabled.
    1706            1 :         invariants.SetFinalizer(d.closed, nil)
    1707            1 : 
    1708            1 :         d.closed.Store(errors.WithStack(ErrClosed))
    1709            1 :         close(d.closedCh)
    1710            1 : 
    1711            1 :         defer d.cacheHandle.Close()
    1712            1 : 
    1713            1 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
    1714            1 :                 d.mu.compact.cond.Wait()
    1715            1 :         }
    1716            1 :         for d.mu.tableStats.loading {
    1717            1 :                 d.mu.tableStats.cond.Wait()
    1718            1 :         }
    1719            1 :         for d.mu.tableValidation.validating {
    1720            1 :                 d.mu.tableValidation.cond.Wait()
    1721            1 :         }
    1722              : 
    1723            1 :         var err error
    1724            1 :         if n := len(d.mu.compact.inProgress); n > 0 {
    1725            0 :                 err = errors.Errorf("pebble: %d unexpected in-progress compactions", errors.Safe(n))
    1726            0 :         }
    1727            1 :         err = firstError(err, d.mu.formatVers.marker.Close())
    1728            1 :         if !d.opts.ReadOnly {
    1729            1 :                 if d.mu.log.writer != nil {
    1730            1 :                         _, err2 := d.mu.log.writer.Close()
    1731            1 :                         err = firstError(err, err2)
    1732            1 :                 }
    1733            0 :         } else if d.mu.log.writer != nil {
    1734            0 :                 panic("pebble: log-writer should be nil in read-only mode")
    1735              :         }
    1736            1 :         err = firstError(err, d.mu.log.manager.Close())
    1737            1 :         err = firstError(err, d.fileLock.Close())
    1738            1 : 
    1739            1 :         // Note that versionSet.close() only closes the MANIFEST. The versions list
    1740            1 :         // is still valid for the checks below.
    1741            1 :         err = firstError(err, d.mu.versions.close())
    1742            1 : 
    1743            1 :         err = firstError(err, d.dataDir.Close())
    1744            1 : 
    1745            1 :         d.readState.val.unrefLocked()
    1746            1 : 
    1747            1 :         current := d.mu.versions.currentVersion()
    1748            1 :         for v := d.mu.versions.versions.Front(); true; v = v.Next() {
    1749            1 :                 refs := v.Refs()
    1750            1 :                 if v == current {
    1751            1 :                         if refs != 1 {
    1752            0 :                                 err = firstError(err, errors.Errorf("leaked iterators: current\n%s", v))
    1753            0 :                         }
    1754            1 :                         break
    1755              :                 }
    1756            0 :                 if refs != 0 {
    1757            0 :                         err = firstError(err, errors.Errorf("leaked iterators:\n%s", v))
    1758            0 :                 }
    1759              :         }
    1760              : 
    1761            1 :         for _, mem := range d.mu.mem.queue {
    1762            1 :                 // Usually, we'd want to delete the files returned by readerUnref. But
    1763            1 :                 // in this case, even if we're unreferencing the flushables, the
    1764            1 :                 // flushables aren't obsolete. They will be reconstructed during WAL
    1765            1 :                 // replay.
    1766            1 :                 mem.readerUnrefLocked(false)
    1767            1 :         }
    1768              :         // If there's an unused, recycled memtable, we need to release its memory.
    1769            1 :         if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
    1770            1 :                 d.freeMemTable(obsoleteMemTable)
    1771            1 :         }
    1772            1 :         if reserved := d.memTableReserved.Load(); reserved != 0 {
    1773            0 :                 err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved)))
    1774            0 :         }
    1775              : 
    1776              :         // Since we called d.readState.val.unrefLocked() above, we are expected to
    1777              :         // manually schedule deletion of obsolete files.
    1778            1 :         if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
    1779            1 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
    1780            1 :         }
    1781              : 
    1782            1 :         d.mu.Unlock()
    1783            1 : 
    1784            1 :         // Wait for all cleaning jobs to finish.
    1785            1 :         d.cleanupManager.Close()
    1786            1 : 
    1787            1 :         // Sanity check metrics.
    1788            1 :         if invariants.Enabled {
    1789            1 :                 m := d.Metrics()
    1790            1 :                 if m.Compact.NumInProgress > 0 || m.Compact.InProgressBytes > 0 {
    1791            0 :                         d.mu.Lock()
    1792            0 :                         panic(fmt.Sprintf("invalid metrics on close:\n%s", m))
    1793              :                 }
    1794              :         }
    1795              : 
    1796            1 :         d.mu.Lock()
    1797            1 : 
    1798            1 :         // As a sanity check, ensure that there are no zombie tables or blob files.
    1799            1 :         // A non-zero count hints at a reference count leak.
    1800            1 :         if ztbls := d.mu.versions.zombieTables.Count(); ztbls > 0 {
    1801            0 :                 err = firstError(err, errors.Errorf("non-zero zombie file count: %d", ztbls))
    1802            0 :         }
    1803            1 :         if zblobs := d.mu.versions.zombieBlobs.Count(); zblobs > 0 {
    1804            0 :                 err = firstError(err, errors.Errorf("non-zero zombie blob count: %d", zblobs))
    1805            0 :         }
    1806              : 
    1807            1 :         err = firstError(err, d.fileCache.Close())
    1808            1 : 
    1809            1 :         err = firstError(err, d.objProvider.Close())
    1810            1 : 
    1811            1 :         // If the options include a closer to 'close' the filesystem, close it.
    1812            1 :         if d.opts.private.fsCloser != nil {
    1813            1 :                 d.opts.private.fsCloser.Close()
    1814            1 :         }
    1815              : 
    1816              :         // Return an error if the user failed to close all open snapshots.
    1817            1 :         if v := d.mu.snapshots.count(); v > 0 {
    1818            0 :                 err = firstError(err, errors.Errorf("leaked snapshots: %d open snapshots on DB %p", v, d))
    1819            0 :         }
    1820              : 
    1821            1 :         return err
    1822              : }
    1823              : 
    1824              : // Compact the specified range of keys in the database.
    1825            1 : func (d *DB) Compact(ctx context.Context, start, end []byte, parallelize bool) error {
    1826            1 :         if err := d.closed.Load(); err != nil {
    1827            0 :                 panic(err)
    1828              :         }
    1829            1 :         if d.opts.ReadOnly {
    1830            0 :                 return ErrReadOnly
    1831            0 :         }
    1832            1 :         if d.cmp(start, end) >= 0 {
    1833            1 :                 return errors.Errorf("Compact start %s is not less than end %s",
    1834            1 :                         d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
    1835            1 :         }
    1836              : 
    1837            1 :         d.mu.Lock()
    1838            1 :         maxLevelWithFiles := 1
    1839            1 :         cur := d.mu.versions.currentVersion()
    1840            1 :         for level := 0; level < numLevels; level++ {
    1841            1 :                 overlaps := cur.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
    1842            1 :                 if !overlaps.Empty() {
    1843            1 :                         maxLevelWithFiles = level + 1
    1844            1 :                 }
    1845              :         }
    1846              : 
    1847              :         // Determine if any memtable overlaps with the compaction range. We wait for
    1848              :         // any such overlap to flush (initiating a flush if necessary).
    1849            1 :         mem, err := func() (*flushableEntry, error) {
    1850            1 :                 // Check to see if any files overlap with any of the memtables. The queue
    1851            1 :                 // is ordered from oldest to newest with the mutable memtable being the
    1852            1 :                 // last element in the slice. We want to wait for the newest table that
    1853            1 :                 // overlaps.
    1854            1 :                 for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
    1855            1 :                         mem := d.mu.mem.queue[i]
    1856            1 :                         var anyOverlaps bool
    1857            1 :                         mem.computePossibleOverlaps(func(b bounded) shouldContinue {
    1858            1 :                                 anyOverlaps = true
    1859            1 :                                 return stopIteration
    1860            1 :                         }, KeyRange{Start: start, End: end})
    1861            1 :                         if !anyOverlaps {
    1862            1 :                                 continue
    1863              :                         }
    1864            1 :                         var err error
    1865            1 :                         if mem.flushable == d.mu.mem.mutable {
    1866            1 :                                 // We have to hold both commitPipeline.mu and DB.mu when calling
    1867            1 :                                 // makeRoomForWrite(). Lock order requirements elsewhere force us to
    1868            1 :                                 // unlock DB.mu in order to grab commitPipeline.mu first.
    1869            1 :                                 d.mu.Unlock()
    1870            1 :                                 d.commit.mu.Lock()
    1871            1 :                                 d.mu.Lock()
    1872            1 :                                 defer d.commit.mu.Unlock() //nolint:deferloop
    1873            1 :                                 if mem.flushable == d.mu.mem.mutable {
    1874            1 :                                         // Only flush if the active memtable is unchanged.
    1875            1 :                                         err = d.makeRoomForWrite(nil)
    1876            1 :                                 }
    1877              :                         }
    1878            1 :                         mem.flushForced = true
    1879            1 :                         d.maybeScheduleFlush()
    1880            1 :                         return mem, err
    1881              :                 }
    1882            1 :                 return nil, nil
    1883              :         }()
    1884              : 
    1885            1 :         d.mu.Unlock()
    1886            1 : 
    1887            1 :         if err != nil {
    1888            0 :                 return err
    1889            0 :         }
    1890            1 :         if mem != nil {
    1891            1 :                 select {
    1892            1 :                 case <-mem.flushed:
    1893            0 :                 case <-ctx.Done():
    1894            0 :                         return ctx.Err()
    1895              :                 }
    1896              :         }
    1897              : 
    1898            1 :         for level := 0; level < maxLevelWithFiles; {
    1899            1 :                 for {
    1900            1 :                         if err := d.manualCompact(
    1901            1 :                                 ctx, start, end, level, parallelize); err != nil {
    1902            0 :                                 if errors.Is(err, ErrCancelledCompaction) {
    1903            0 :                                         continue
    1904              :                                 }
    1905            0 :                                 return err
    1906              :                         }
    1907            1 :                         break
    1908              :                 }
    1909            1 :                 level++
    1910            1 :                 if level == numLevels-1 {
    1911            1 :                         // A manual compaction of the bottommost level occurred.
    1912            1 :                         // There is no next level to try and compact.
    1913            1 :                         break
    1914              :                 }
    1915              :         }
    1916            1 :         return nil
    1917              : }
    1918              : 
    1919              : func (d *DB) manualCompact(
    1920              :         ctx context.Context, start, end []byte, level int, parallelize bool,
    1921            1 : ) error {
    1922            1 :         d.mu.Lock()
    1923            1 :         curr := d.mu.versions.currentVersion()
    1924            1 :         files := curr.Overlaps(level, base.UserKeyBoundsInclusive(start, end))
    1925            1 :         if files.Empty() {
    1926            1 :                 d.mu.Unlock()
    1927            1 :                 return nil
    1928            1 :         }
    1929              : 
    1930            1 :         var compactions []*manualCompaction
    1931            1 :         if parallelize {
    1932            1 :                 compactions = append(compactions, d.splitManualCompaction(start, end, level)...)
    1933            1 :         } else {
    1934            1 :                 compactions = append(compactions, &manualCompaction{
    1935            1 :                         level: level,
    1936            1 :                         done:  make(chan error, 1),
    1937            1 :                         start: start,
    1938            1 :                         end:   end,
    1939            1 :                 })
    1940            1 :         }
    1941            1 :         n := len(compactions)
    1942            1 :         if n == 0 {
    1943            0 :                 d.mu.Unlock()
    1944            0 :                 return nil
    1945            0 :         }
    1946            1 :         for i := range compactions {
    1947            1 :                 d.mu.compact.manualID++
    1948            1 :                 compactions[i].id = d.mu.compact.manualID
    1949            1 :         }
    1950              :         // [manualIDStart, manualIDEnd] are the compactions that have been added to
    1951              :         // d.mu.compact.manual.
    1952            1 :         manualIDStart := compactions[0].id
    1953            1 :         manualIDEnd := compactions[n-1].id
    1954            1 :         d.mu.compact.manual = append(d.mu.compact.manual, compactions...)
    1955            1 :         d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
    1956            1 :         d.maybeScheduleCompaction()
    1957            1 :         d.mu.Unlock()
    1958            1 : 
    1959            1 :         // On context cancellation, we only cancel the compactions that have not yet
    1960            1 :         // started. The assumption is that it is relatively harmless to have the
    1961            1 :         // already started compactions run to completion. We don't wait for the
    1962            1 :         // ongoing compactions to finish, since the assumption is that the caller
    1963            1 :         // has already given up on the operation (and the cancellation error is
    1964            1 :         // going to be returned anyway).
    1965            1 :         //
    1966            1 :         // An alternative would be to store the context in each *manualCompaction,
    1967            1 :         // and have the goroutine that retrieves the *manualCompaction for running
    1968            1 :         // notice the cancellation and write the cancellation error to
    1969            1 :         // manualCompaction.done. That approach would require this method to wait
    1970            1 :         // for all the *manualCompactions it has enqueued to finish before returning
    1971            1 :         // (to not leak a context). Since there is no timeliness guarantee on when a
    1972            1 :         // *manualCompaction will be retrieved for running, the wait until a
    1973            1 :         // cancelled context causes this method to return is not bounded. Hence, we
    1974            1 :         // don't adopt that approach.
    1975            1 :         cancelPendingCompactions := func() {
    1976            0 :                 d.mu.Lock()
    1977            0 :                 for i := 0; i < len(d.mu.compact.manual); {
    1978            0 :                         if d.mu.compact.manual[i].id >= manualIDStart && d.mu.compact.manual[i].id <= manualIDEnd {
    1979            0 :                                 d.mu.compact.manual = slices.Delete(d.mu.compact.manual, i, i+1)
    1980            0 :                                 d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
    1981            0 :                         } else {
    1982            0 :                                 i++
    1983            0 :                         }
    1984              :                 }
    1985            0 :                 d.mu.Unlock()
    1986              :         }
    1987              :         // Each of the channels is guaranteed to be eventually sent to once. After a
    1988              :         // compaction is possibly picked in d.maybeScheduleCompaction(), either the
    1989              :         // compaction is dropped, executed after being scheduled, or retried later.
    1990              :         // Assuming eventual progress when a compaction is retried, all outcomes send
    1991              :         // a value to the done channel. Since the channels are buffered, it is not
    1992              :         // necessary to read from each channel, and so we can exit early in the event
    1993              :         // of an error.
    1994            1 :         for _, compaction := range compactions {
    1995            1 :                 select {
    1996            0 :                 case <-ctx.Done():
    1997            0 :                         cancelPendingCompactions()
    1998            0 :                         return ctx.Err()
    1999            1 :                 case err := <-compaction.done:
    2000            1 :                         if err != nil {
    2001            0 :                                 cancelPendingCompactions()
    2002            0 :                                 return err
    2003            0 :                         }
    2004              :                 }
    2005              :         }
    2006            1 :         return nil
    2007              : }
    2008              : 
    2009              : // splitManualCompaction splits a manual compaction over [start,end] on level
    2010              : // such that the resulting compactions have no key overlap.
    2011              : func (d *DB) splitManualCompaction(
    2012              :         start, end []byte, level int,
    2013            1 : ) (splitCompactions []*manualCompaction) {
    2014            1 :         curr := d.mu.versions.currentVersion()
    2015            1 :         endLevel := level + 1
    2016            1 :         baseLevel := d.mu.versions.picker.getBaseLevel()
    2017            1 :         if level == 0 {
    2018            1 :                 endLevel = baseLevel
    2019            1 :         }
    2020            1 :         keyRanges := curr.CalculateInuseKeyRanges(d.mu.versions.latest.l0Organizer, level, endLevel, start, end)
    2021            1 :         for _, keyRange := range keyRanges {
    2022            1 :                 splitCompactions = append(splitCompactions, &manualCompaction{
    2023            1 :                         level: level,
    2024            1 :                         done:  make(chan error, 1),
    2025            1 :                         start: keyRange.Start,
    2026            1 :                         end:   keyRange.End.Key,
    2027            1 :                         split: true,
    2028            1 :                 })
    2029            1 :         }
    2030            1 :         return splitCompactions
    2031              : }
    2032              : 
    2033              : // Flush the memtable to stable storage.
    2034            1 : func (d *DB) Flush() error {
    2035            1 :         flushDone, err := d.AsyncFlush()
    2036            1 :         if err != nil {
    2037            0 :                 return err
    2038            0 :         }
    2039            1 :         <-flushDone
    2040            1 :         return nil
    2041              : }
    2042              : 
    2043              : // AsyncFlush asynchronously flushes the memtable to stable storage.
    2044              : //
    2045              : // If no error is returned, the caller can receive from the returned channel in
    2046              : // order to wait for the flush to complete.
    2047            1 : func (d *DB) AsyncFlush() (<-chan struct{}, error) {
    2048            1 :         if err := d.closed.Load(); err != nil {
    2049            0 :                 panic(err)
    2050              :         }
    2051            1 :         if d.opts.ReadOnly {
    2052            0 :                 return nil, ErrReadOnly
    2053            0 :         }
    2054              : 
    2055            1 :         d.commit.mu.Lock()
    2056            1 :         defer d.commit.mu.Unlock()
    2057            1 :         d.mu.Lock()
    2058            1 :         defer d.mu.Unlock()
    2059            1 :         flushed := d.mu.mem.queue[len(d.mu.mem.queue)-1].flushed
    2060            1 :         err := d.makeRoomForWrite(nil)
    2061            1 :         if err != nil {
    2062            0 :                 return nil, err
    2063            0 :         }
    2064            1 :         return flushed, nil
    2065              : }
    2066              : 
    2067              : // Metrics returns metrics about the database.
    2068            1 : func (d *DB) Metrics() *Metrics {
    2069            1 :         metrics := &Metrics{}
    2070            1 :         walStats := d.mu.log.manager.Stats()
    2071            1 :         completedObsoleteFileStats := d.cleanupManager.CompletedStats()
    2072            1 : 
    2073            1 :         d.mu.Lock()
    2074            1 :         vers := d.mu.versions.currentVersion()
    2075            1 :         *metrics = d.mu.versions.metrics
    2076            1 :         metrics.Compact.EstimatedDebt = d.mu.versions.picker.estimatedCompactionDebt()
    2077            1 :         metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
    2078            1 :         // TODO(radu): split this to separate the download compactions.
    2079            1 :         metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount + d.mu.compact.downloadingCount)
    2080            1 :         metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
    2081            1 :         metrics.Compact.Duration = d.mu.compact.duration
    2082            1 :         for c := range d.mu.compact.inProgress {
    2083            0 :                 if !c.IsFlush() {
    2084            0 :                         metrics.Compact.Duration += d.timeNow().Sub(c.BeganAt())
    2085            0 :                 }
    2086              :         }
    2087            1 :         metrics.Compact.NumProblemSpans = d.problemSpans.Len()
    2088            1 : 
    2089            1 :         for _, m := range d.mu.mem.queue {
    2090            1 :                 metrics.MemTable.Size += m.totalBytes()
    2091            1 :         }
    2092            1 :         metrics.Snapshots.Count = d.mu.snapshots.count()
    2093            1 :         if metrics.Snapshots.Count > 0 {
    2094            0 :                 metrics.Snapshots.EarliestSeqNum = d.mu.snapshots.earliest()
    2095            0 :         }
    2096            1 :         metrics.Snapshots.PinnedKeys = d.mu.snapshots.cumulativePinnedCount
    2097            1 :         metrics.Snapshots.PinnedSize = d.mu.snapshots.cumulativePinnedSize
    2098            1 :         metrics.MemTable.Count = int64(len(d.mu.mem.queue))
    2099            1 :         metrics.MemTable.ZombieCount = d.memTableCount.Load() - metrics.MemTable.Count
    2100            1 :         metrics.MemTable.ZombieSize = uint64(d.memTableReserved.Load()) - metrics.MemTable.Size
    2101            1 :         metrics.WAL.ObsoleteFiles = int64(walStats.ObsoleteFileCount)
    2102            1 :         metrics.WAL.ObsoletePhysicalSize = walStats.ObsoleteFileSize
    2103            1 :         metrics.WAL.Files = int64(walStats.LiveFileCount)
    2104            1 :         // The current WAL's size (d.logSize) is the logical size, which may be less
    2105            1 :         // than the WAL's physical size if it was recycled. walStats.LiveFileSize
    2106            1 :         // includes the physical size of all live WALs, but for the current WAL it
    2107            1 :         // reflects the physical size when it was opened. So it is possible that
    2108            1 :         // d.atomic.logSize has exceeded that physical size. We allow for this
    2109            1 :         // anomaly.
    2110            1 :         metrics.WAL.PhysicalSize = walStats.LiveFileSize
    2111            1 :         metrics.WAL.BytesIn = d.logBytesIn.Load()
    2112            1 :         metrics.WAL.Size = d.logSize.Load()
    2113            1 :         for i, n := 0, len(d.mu.mem.queue)-1; i < n; i++ {
    2114            1 :                 metrics.WAL.Size += d.mu.mem.queue[i].logSize
    2115            1 :         }
    2116            1 :         metrics.WAL.BytesWritten = metrics.Levels[0].TableBytesIn + metrics.WAL.Size
    2117            1 :         metrics.WAL.Failover = walStats.Failover
    2118            1 : 
    2119            1 :         if p := d.mu.versions.picker; p != nil {
    2120            1 :                 compactions := d.getInProgressCompactionInfoLocked(nil)
    2121            1 :                 m := p.getMetrics(compactions)
    2122            1 :                 for level, lm := range m.levels {
    2123            1 :                         metrics.Levels[level].Score = lm.score
    2124            1 :                         metrics.Levels[level].FillFactor = lm.fillFactor
    2125            1 :                         metrics.Levels[level].CompensatedFillFactor = lm.compensatedFillFactor
    2126            1 :                 }
    2127              :         }
    2128            1 :         metrics.Table.ZombieCount = int64(d.mu.versions.zombieTables.Count())
    2129            1 :         metrics.Table.ZombieSize = d.mu.versions.zombieTables.TotalSize()
    2130            1 :         metrics.Table.Local.ZombieCount, metrics.Table.Local.ZombieSize = d.mu.versions.zombieTables.LocalStats()
    2131            1 : 
    2132            1 :         // The obsolete blob/table metrics have a subtle calculation:
    2133            1 :         //
    2134            1 :         // (A) The vs.metrics.{Table,BlobFiles}.[Local.]{ObsoleteCount,ObsoleteSize}
    2135            1 :         // fields reflect the set of files currently sitting in
    2136            1 :         // vs.obsolete{Tables,Blobs} but not yet enqueued to the cleanup manager.
    2137            1 :         //
    2138            1 :         // (B) The d.mu.fileDeletions.queuedStats field holds the set of files that have
    2139            1 :         // been queued for deletion by the cleanup manager.
    2140            1 :         //
    2141            1 :         // (C) The cleanup manager also maintains cumulative stats for the set of
    2142            1 :         // files that have been deleted.
    2143            1 :         //
    2144            1 :         // The value of currently pending obsolete files is (A) + (B) - (C).
    2145            1 :         pendingObsoleteFileStats := d.mu.fileDeletions.queuedStats
    2146            1 :         pendingObsoleteFileStats.Sub(completedObsoleteFileStats)
    2147            1 :         metrics.Table.Local.ObsoleteCount += pendingObsoleteFileStats.tablesLocal.count
    2148            1 :         metrics.Table.Local.ObsoleteSize += pendingObsoleteFileStats.tablesLocal.size
    2149            1 :         metrics.Table.ObsoleteCount += int64(pendingObsoleteFileStats.tablesAll.count)
    2150            1 :         metrics.Table.ObsoleteSize += pendingObsoleteFileStats.tablesAll.size
    2151            1 :         metrics.BlobFiles.Local.ObsoleteCount += pendingObsoleteFileStats.blobFilesLocal.count
    2152            1 :         metrics.BlobFiles.Local.ObsoleteSize += pendingObsoleteFileStats.blobFilesLocal.size
    2153            1 :         metrics.BlobFiles.ObsoleteCount += pendingObsoleteFileStats.blobFilesAll.count
    2154            1 :         metrics.BlobFiles.ObsoleteSize += pendingObsoleteFileStats.blobFilesAll.size
    2155            1 :         metrics.private.optionsFileSize = d.optionsFileSize
    2156            1 : 
    2157            1 :         // TODO(jackson): Consider making these metrics optional.
    2158            1 :         metrics.Keys.RangeKeySetsCount = *rangeKeySetsAnnotator.MultiLevelAnnotation(vers.RangeKeyLevels[:])
    2159            1 :         metrics.Keys.TombstoneCount = *tombstonesAnnotator.MultiLevelAnnotation(vers.Levels[:])
    2160            1 : 
    2161            1 :         metrics.Table.Garbage.PointDeletionsBytesEstimate =
    2162            1 :                 *pointDeletionsBytesEstimateAnnotator.MultiLevelAnnotation(vers.Levels[:])
    2163            1 :         metrics.Table.Garbage.RangeDeletionsBytesEstimate =
    2164            1 :                 *rangeDeletionsBytesEstimateAnnotator.MultiLevelAnnotation(vers.Levels[:])
    2165            1 : 
    2166            1 :         d.mu.versions.logLock()
    2167            1 :         metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
    2168            1 :         backingCount, backingTotalSize := d.mu.versions.latest.virtualBackings.Stats()
    2169            1 :         metrics.Table.BackingTableCount = uint64(backingCount)
    2170            1 :         metrics.Table.BackingTableSize = backingTotalSize
    2171            1 :         blobStats, _ := d.mu.versions.latest.blobFiles.Stats()
    2172            1 :         d.mu.versions.logUnlock()
    2173            1 :         metrics.BlobFiles.LiveCount = blobStats.Count
    2174            1 :         metrics.BlobFiles.LiveSize = blobStats.PhysicalSize
    2175            1 :         metrics.BlobFiles.ValueSize = blobStats.ValueSize
    2176            1 :         metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize
    2177            1 : 
    2178            1 :         metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
    2179            1 :         if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
    2180            0 :                 d.opts.Logger.Errorf("metrics error: %s", err)
    2181            0 :         }
    2182            1 :         metrics.Flush.WriteThroughput = d.mu.compact.flushWriteThroughput
    2183            1 :         if d.mu.compact.flushing {
    2184            0 :                 metrics.Flush.NumInProgress = 1
    2185            0 :         }
    2186            1 :         for i := 0; i < numLevels; i++ {
    2187            1 :                 metrics.Levels[i].Additional.ValueBlocksSize = *valueBlockSizeAnnotator.LevelAnnotation(vers.Levels[i])
    2188            1 :                 compressionTypes := compressionTypeAnnotator.LevelAnnotation(vers.Levels[i])
    2189            1 :                 metrics.Table.CompressedCountUnknown += int64(compressionTypes.unknown)
    2190            1 :                 metrics.Table.CompressedCountSnappy += int64(compressionTypes.snappy)
    2191            1 :                 metrics.Table.CompressedCountZstd += int64(compressionTypes.zstd)
    2192            1 :                 metrics.Table.CompressedCountMinLZ += int64(compressionTypes.minlz)
    2193            1 :                 metrics.Table.CompressedCountNone += int64(compressionTypes.none)
    2194            1 :         }
    2195              : 
    2196            1 :         metrics.Table.PendingStatsCollectionCount = int64(len(d.mu.tableStats.pending))
    2197            1 :         metrics.Table.InitialStatsCollectionComplete = d.mu.tableStats.loadedInitial
    2198            1 : 
    2199            1 :         d.mu.Unlock()
    2200            1 : 
    2201            1 :         metrics.BlockCache = d.opts.Cache.Metrics()
    2202            1 :         metrics.FileCache, metrics.Filter = d.fileCache.Metrics()
    2203            1 :         metrics.TableIters = d.fileCache.IterCount()
    2204            1 :         metrics.CategoryStats = d.fileCache.SSTStatsCollector().GetStats()
    2205            1 : 
    2206            1 :         metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
    2207            1 : 
    2208            1 :         metrics.Uptime = d.timeNow().Sub(d.openedAt)
    2209            1 : 
    2210            1 :         metrics.manualMemory = manual.GetMetrics()
    2211            1 : 
    2212            1 :         return metrics
    2213              : }
    2214              : 
    2215              : // sstablesOptions hold the optional parameters to retrieve TableInfo for all sstables.
    2216              : type sstablesOptions struct {
    2217              :         // set to true will return the sstable properties in TableInfo
    2218              :         withProperties bool
    2219              : 
    2220              :         // if set, return sstables that overlap the key range (end-exclusive)
    2221              :         start []byte
    2222              :         end   []byte
    2223              : 
    2224              :         withApproximateSpanBytes bool
    2225              : }
    2226              : 
    2227              : // SSTablesOption set optional parameter used by `DB.SSTables`.
    2228              : type SSTablesOption func(*sstablesOptions)
    2229              : 
    2230              : // WithProperties enable return sstable properties in each TableInfo.
    2231              : //
    2232              : // NOTE: if most of the sstable properties need to be read from disk,
    2233              : // this options may make method `SSTables` quite slow.
    2234            0 : func WithProperties() SSTablesOption {
    2235            0 :         return func(opt *sstablesOptions) {
    2236            0 :                 opt.withProperties = true
    2237            0 :         }
    2238              : }
    2239              : 
    2240              : // WithKeyRangeFilter ensures returned sstables overlap start and end (end-exclusive)
    2241              : // if start and end are both nil these properties have no effect.
    2242            0 : func WithKeyRangeFilter(start, end []byte) SSTablesOption {
    2243            0 :         return func(opt *sstablesOptions) {
    2244            0 :                 opt.end = end
    2245            0 :                 opt.start = start
    2246            0 :         }
    2247              : }
    2248              : 
    2249              : // WithApproximateSpanBytes enables capturing the approximate number of bytes that
    2250              : // overlap the provided key span for each sstable.
    2251              : // NOTE: This option requires WithKeyRangeFilter.
    2252            0 : func WithApproximateSpanBytes() SSTablesOption {
    2253            0 :         return func(opt *sstablesOptions) {
    2254            0 :                 opt.withApproximateSpanBytes = true
    2255            0 :         }
    2256              : }
    2257              : 
    2258              : // BackingType denotes the type of storage backing a given sstable.
    2259              : type BackingType int
    2260              : 
    2261              : const (
    2262              :         // BackingTypeLocal denotes an sstable stored on local disk according to the
    2263              :         // objprovider. This file is completely owned by us.
    2264              :         BackingTypeLocal BackingType = iota
    2265              :         // BackingTypeShared denotes an sstable stored on shared storage, created
    2266              :         // by this Pebble instance and possibly shared by other Pebble instances.
    2267              :         // These types of files have lifecycle managed by Pebble.
    2268              :         BackingTypeShared
    2269              :         // BackingTypeSharedForeign denotes an sstable stored on shared storage,
    2270              :         // created by a Pebble instance other than this one. These types of files have
    2271              :         // lifecycle managed by Pebble.
    2272              :         BackingTypeSharedForeign
    2273              :         // BackingTypeExternal denotes an sstable stored on external storage,
    2274              :         // not owned by any Pebble instance and with no refcounting/cleanup methods
    2275              :         // or lifecycle management. An example of an external file is a file restored
    2276              :         // from a backup.
    2277              :         BackingTypeExternal
    2278              :         backingTypeCount
    2279              : )
    2280              : 
    2281              : var backingTypeToString = [backingTypeCount]string{
    2282              :         BackingTypeLocal:         "local",
    2283              :         BackingTypeShared:        "shared",
    2284              :         BackingTypeSharedForeign: "shared-foreign",
    2285              :         BackingTypeExternal:      "external",
    2286              : }
    2287              : 
    2288              : // String implements fmt.Stringer.
    2289            0 : func (b BackingType) String() string {
    2290            0 :         return backingTypeToString[b]
    2291            0 : }
    2292              : 
    2293              : // SSTableInfo export manifest.TableInfo with sstable.Properties alongside
    2294              : // other file backing info.
    2295              : type SSTableInfo struct {
    2296              :         manifest.TableInfo
    2297              :         TableStats manifest.TableStats
    2298              :         // Virtual indicates whether the sstable is virtual.
    2299              :         Virtual bool
    2300              :         // BackingSSTNum is the disk file number associated with the backing sstable.
    2301              :         // If Virtual is false, BackingSSTNum == PhysicalTableDiskFileNum(TableNum).
    2302              :         BackingSSTNum base.DiskFileNum
    2303              :         // BackingType is the type of storage backing this sstable.
    2304              :         BackingType BackingType
    2305              :         // Locator is the remote.Locator backing this sstable, if the backing type is
    2306              :         // not BackingTypeLocal.
    2307              :         Locator remote.Locator
    2308              :         // ApproximateSpanBytes describes the approximate number of bytes within the
    2309              :         // sstable that fall within a particular span. It's populated only when the
    2310              :         // ApproximateSpanBytes option is passed into DB.SSTables.
    2311              :         ApproximateSpanBytes uint64 `json:"ApproximateSpanBytes,omitempty"`
    2312              : 
    2313              :         // Properties is the sstable properties of this table. If Virtual is true,
    2314              :         // then the Properties are associated with the backing sst.
    2315              :         Properties *sstable.Properties
    2316              : }
    2317              : 
    2318              : // SSTables retrieves the current sstables. The returned slice is indexed by
    2319              : // level and each level is indexed by the position of the sstable within the
    2320              : // level. Note that this information may be out of date due to concurrent
    2321              : // flushes and compactions.
    2322            0 : func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) {
    2323            0 :         opt := &sstablesOptions{}
    2324            0 :         for _, fn := range opts {
    2325            0 :                 fn(opt)
    2326            0 :         }
    2327              : 
    2328            0 :         if opt.withApproximateSpanBytes && (opt.start == nil || opt.end == nil) {
    2329            0 :                 return nil, errors.Errorf("cannot use WithApproximateSpanBytes without WithKeyRangeFilter option")
    2330            0 :         }
    2331              : 
    2332              :         // Grab and reference the current readState.
    2333            0 :         readState := d.loadReadState()
    2334            0 :         defer readState.unref()
    2335            0 : 
    2336            0 :         // TODO(peter): This is somewhat expensive, especially on a large
    2337            0 :         // database. It might be worthwhile to unify TableInfo and TableMetadata and
    2338            0 :         // then we could simply return current.Files. Note that RocksDB is doing
    2339            0 :         // something similar to the current code, so perhaps it isn't too bad.
    2340            0 :         srcLevels := readState.current.Levels
    2341            0 :         var totalTables int
    2342            0 :         for i := range srcLevels {
    2343            0 :                 totalTables += srcLevels[i].Len()
    2344            0 :         }
    2345              : 
    2346            0 :         destTables := make([]SSTableInfo, totalTables)
    2347            0 :         destLevels := make([][]SSTableInfo, len(srcLevels))
    2348            0 :         for i := range destLevels {
    2349            0 :                 j := 0
    2350            0 :                 for m := range srcLevels[i].All() {
    2351            0 :                         if opt.start != nil && opt.end != nil {
    2352            0 :                                 b := base.UserKeyBoundsEndExclusive(opt.start, opt.end)
    2353            0 :                                 if !m.Overlaps(d.opts.Comparer.Compare, &b) {
    2354            0 :                                         continue
    2355              :                                 }
    2356              :                         }
    2357            0 :                         var tableStats manifest.TableStats
    2358            0 :                         if m.StatsValid() {
    2359            0 :                                 tableStats = m.Stats
    2360            0 :                         }
    2361            0 :                         destTables[j] = SSTableInfo{
    2362            0 :                                 TableInfo:  m.TableInfo(),
    2363            0 :                                 TableStats: tableStats,
    2364            0 :                         }
    2365            0 :                         if opt.withProperties {
    2366            0 :                                 p, err := d.fileCache.getTableProperties(
    2367            0 :                                         m,
    2368            0 :                                 )
    2369            0 :                                 if err != nil {
    2370            0 :                                         return nil, err
    2371            0 :                                 }
    2372            0 :                                 if m.Virtual {
    2373            0 :                                         commonProps := p.GetScaledProperties(m.TableBacking.Size, m.Size)
    2374            0 :                                         p = &sstable.Properties{CommonProperties: commonProps}
    2375            0 :                                 }
    2376            0 :                                 destTables[j].Properties = p
    2377              :                         }
    2378            0 :                         destTables[j].Virtual = m.Virtual
    2379            0 :                         destTables[j].BackingSSTNum = m.TableBacking.DiskFileNum
    2380            0 :                         objMeta, err := d.objProvider.Lookup(base.FileTypeTable, m.TableBacking.DiskFileNum)
    2381            0 :                         if err != nil {
    2382            0 :                                 return nil, err
    2383            0 :                         }
    2384            0 :                         if objMeta.IsRemote() {
    2385            0 :                                 if objMeta.IsShared() {
    2386            0 :                                         if d.objProvider.IsSharedForeign(objMeta) {
    2387            0 :                                                 destTables[j].BackingType = BackingTypeSharedForeign
    2388            0 :                                         } else {
    2389            0 :                                                 destTables[j].BackingType = BackingTypeShared
    2390            0 :                                         }
    2391            0 :                                 } else {
    2392            0 :                                         destTables[j].BackingType = BackingTypeExternal
    2393            0 :                                 }
    2394            0 :                                 destTables[j].Locator = objMeta.Remote.Locator
    2395            0 :                         } else {
    2396            0 :                                 destTables[j].BackingType = BackingTypeLocal
    2397            0 :                         }
    2398              : 
    2399            0 :                         if opt.withApproximateSpanBytes {
    2400            0 :                                 if m.ContainedWithinSpan(d.opts.Comparer.Compare, opt.start, opt.end) {
    2401            0 :                                         destTables[j].ApproximateSpanBytes = m.Size
    2402            0 :                                 } else {
    2403            0 :                                         size, err := d.fileCache.estimateSize(m, opt.start, opt.end)
    2404            0 :                                         if err != nil {
    2405            0 :                                                 return nil, err
    2406            0 :                                         }
    2407            0 :                                         destTables[j].ApproximateSpanBytes = size
    2408              :                                 }
    2409              :                         }
    2410            0 :                         j++
    2411              :                 }
    2412            0 :                 destLevels[i] = destTables[:j]
    2413            0 :                 destTables = destTables[j:]
    2414              :         }
    2415              : 
    2416            0 :         return destLevels, nil
    2417              : }
    2418              : 
    2419              : // makeFileSizeAnnotator returns an annotator that computes the total
    2420              : // storage size of files that meet some criteria defined by filter. When
    2421              : // applicable, this includes both the sstable size and the size of any
    2422              : // referenced blob files.
    2423              : func (d *DB) makeFileSizeAnnotator(
    2424              :         filter func(f *manifest.TableMetadata) bool,
    2425            1 : ) *manifest.Annotator[uint64] {
    2426            1 :         return &manifest.Annotator[uint64]{
    2427            1 :                 Aggregator: manifest.SumAggregator{
    2428            1 :                         AccumulateFunc: func(f *manifest.TableMetadata) (uint64, bool) {
    2429            0 :                                 if filter(f) {
    2430            0 :                                         return f.Size + f.EstimatedReferenceSize(), true
    2431            0 :                                 }
    2432            0 :                                 return 0, true
    2433              :                         },
    2434            0 :                         AccumulatePartialOverlapFunc: func(f *manifest.TableMetadata, bounds base.UserKeyBounds) uint64 {
    2435            0 :                                 if filter(f) {
    2436            0 :                                         overlappingFileSize, err := d.fileCache.estimateSize(f, bounds.Start, bounds.End.Key)
    2437            0 :                                         if err != nil {
    2438            0 :                                                 return 0
    2439            0 :                                         }
    2440            0 :                                         overlapFraction := float64(overlappingFileSize) / float64(f.Size)
    2441            0 :                                         // Scale the blob reference size proportionally to the file
    2442            0 :                                         // overlap from the bounds to approximate only the blob
    2443            0 :                                         // references that overlap with the requested bounds.
    2444            0 :                                         return overlappingFileSize + uint64(float64(f.EstimatedReferenceSize())*overlapFraction)
    2445              :                                 }
    2446            0 :                                 return 0
    2447              :                         },
    2448              :                 },
    2449              :         }
    2450              : }
    2451              : 
    2452              : // EstimateDiskUsage returns the estimated filesystem space used in bytes for
    2453              : // storing the range `[start, end]`. The estimation is computed as follows:
    2454              : //
    2455              : //   - For sstables fully contained in the range the whole file size is included.
    2456              : //   - For sstables partially contained in the range the overlapping data block sizes
    2457              : //     are included. Even if a data block partially overlaps, or we cannot determine
    2458              : //     overlap due to abbreviated index keys, the full data block size is included in
    2459              : //     the estimation. Note that unlike fully contained sstables, none of the
    2460              : //     meta-block space is counted for partially overlapped files.
    2461              : //   - For virtual sstables, we use the overlap between start, end and the virtual
    2462              : //     sstable bounds to determine disk usage.
    2463              : //   - There may also exist WAL entries for unflushed keys in this range. This
    2464              : //     estimation currently excludes space used for the range in the WAL.
    2465            0 : func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) {
    2466            0 :         bytes, _, _, err := d.EstimateDiskUsageByBackingType(start, end)
    2467            0 :         return bytes, err
    2468            0 : }
    2469              : 
    2470              : // EstimateDiskUsageByBackingType is like EstimateDiskUsage but additionally
    2471              : // returns the subsets of that size in remote ane external files.
    2472              : func (d *DB) EstimateDiskUsageByBackingType(
    2473              :         start, end []byte,
    2474            0 : ) (totalSize, remoteSize, externalSize uint64, _ error) {
    2475            0 :         if err := d.closed.Load(); err != nil {
    2476            0 :                 panic(err)
    2477              :         }
    2478              : 
    2479            0 :         bounds := base.UserKeyBoundsInclusive(start, end)
    2480            0 :         if !bounds.Valid(d.cmp) {
    2481            0 :                 return 0, 0, 0, errors.New("invalid key-range specified (start > end)")
    2482            0 :         }
    2483              : 
    2484              :         // Grab and reference the current readState. This prevents the underlying
    2485              :         // files in the associated version from being deleted if there is a concurrent
    2486              :         // compaction.
    2487            0 :         readState := d.loadReadState()
    2488            0 :         defer readState.unref()
    2489            0 : 
    2490            0 :         totalSize = *d.mu.annotators.totalFileSize.VersionRangeAnnotation(readState.current, bounds)
    2491            0 :         remoteSize = *d.mu.annotators.remoteSize.VersionRangeAnnotation(readState.current, bounds)
    2492            0 :         externalSize = *d.mu.annotators.externalSize.VersionRangeAnnotation(readState.current, bounds)
    2493            0 : 
    2494            0 :         return
    2495              : }
    2496              : 
    2497            1 : func (d *DB) walPreallocateSize() int {
    2498            1 :         // Set the WAL preallocate size to 110% of the memtable size. Note that there
    2499            1 :         // is a bit of apples and oranges in units here as the memtabls size
    2500            1 :         // corresponds to the memory usage of the memtable while the WAL size is the
    2501            1 :         // size of the batches (plus overhead) stored in the WAL.
    2502            1 :         //
    2503            1 :         // TODO(peter): 110% of the memtable size is quite hefty for a block
    2504            1 :         // size. This logic is taken from GetWalPreallocateBlockSize in
    2505            1 :         // RocksDB. Could a smaller preallocation block size be used?
    2506            1 :         size := d.opts.MemTableSize
    2507            1 :         size = (size / 10) + size
    2508            1 :         return int(size)
    2509            1 : }
    2510              : 
    2511              : func (d *DB) newMemTable(
    2512              :         logNum base.DiskFileNum, logSeqNum base.SeqNum, minSize uint64,
    2513            1 : ) (*memTable, *flushableEntry) {
    2514            1 :         targetSize := minSize + uint64(memTableEmptySize)
    2515            1 :         // The targetSize should be less than MemTableSize, because any batch >=
    2516            1 :         // MemTableSize/2 should be treated as a large flushable batch.
    2517            1 :         if targetSize > d.opts.MemTableSize {
    2518            0 :                 panic(errors.AssertionFailedf("attempting to allocate memtable larger than MemTableSize"))
    2519              :         }
    2520              :         // Double until the next memtable size is at least large enough to fit
    2521              :         // minSize.
    2522            1 :         for d.mu.mem.nextSize < targetSize {
    2523            0 :                 d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
    2524            0 :         }
    2525            1 :         size := d.mu.mem.nextSize
    2526            1 :         // The next memtable should be double the size, up to Options.MemTableSize.
    2527            1 :         if d.mu.mem.nextSize < d.opts.MemTableSize {
    2528            1 :                 d.mu.mem.nextSize = min(2*d.mu.mem.nextSize, d.opts.MemTableSize)
    2529            1 :         }
    2530              : 
    2531            1 :         memtblOpts := memTableOptions{
    2532            1 :                 Options:   d.opts,
    2533            1 :                 logSeqNum: logSeqNum,
    2534            1 :         }
    2535            1 : 
    2536            1 :         // Before attempting to allocate a new memtable, check if there's one
    2537            1 :         // available for recycling in memTableRecycle. Large contiguous allocations
    2538            1 :         // can be costly as fragmentation makes it more difficult to find a large
    2539            1 :         // contiguous free space. We've observed 64MB allocations taking 10ms+.
    2540            1 :         //
    2541            1 :         // To reduce these costly allocations, up to 1 obsolete memtable is stashed
    2542            1 :         // in `d.memTableRecycle` to allow a future memtable rotation to reuse
    2543            1 :         // existing memory.
    2544            1 :         var mem *memTable
    2545            1 :         mem = d.memTableRecycle.Swap(nil)
    2546            1 :         if mem != nil && uint64(mem.arenaBuf.Len()) != size {
    2547            1 :                 d.freeMemTable(mem)
    2548            1 :                 mem = nil
    2549            1 :         }
    2550            1 :         if mem != nil {
    2551            1 :                 // Carry through the existing buffer and memory reservation.
    2552            1 :                 memtblOpts.arenaBuf = mem.arenaBuf
    2553            1 :                 memtblOpts.releaseAccountingReservation = mem.releaseAccountingReservation
    2554            1 :         } else {
    2555            1 :                 mem = new(memTable)
    2556            1 :                 memtblOpts.arenaBuf = manual.New(manual.MemTable, uintptr(size))
    2557            1 :                 memtblOpts.releaseAccountingReservation = d.opts.Cache.Reserve(int(size))
    2558            1 :                 d.memTableCount.Add(1)
    2559            1 :                 d.memTableReserved.Add(int64(size))
    2560            1 : 
    2561            1 :                 // Note: this is a no-op if invariants are disabled or race is enabled.
    2562            1 :                 invariants.SetFinalizer(mem, checkMemTable)
    2563            1 :         }
    2564            1 :         mem.init(memtblOpts)
    2565            1 : 
    2566            1 :         entry := d.newFlushableEntry(mem, logNum, logSeqNum)
    2567            1 :         entry.releaseMemAccounting = func() {
    2568            1 :                 // If the user leaks iterators, we may be releasing the memtable after
    2569            1 :                 // the DB is already closed. In this case, we want to just release the
    2570            1 :                 // memory because DB.Close won't come along to free it for us.
    2571            1 :                 if err := d.closed.Load(); err != nil {
    2572            1 :                         d.freeMemTable(mem)
    2573            1 :                         return
    2574            1 :                 }
    2575              : 
    2576              :                 // The next memtable allocation might be able to reuse this memtable.
    2577              :                 // Stash it on d.memTableRecycle.
    2578            1 :                 if unusedMem := d.memTableRecycle.Swap(mem); unusedMem != nil {
    2579            1 :                         // There was already a memtable waiting to be recycled. We're now
    2580            1 :                         // responsible for freeing it.
    2581            1 :                         d.freeMemTable(unusedMem)
    2582            1 :                 }
    2583              :         }
    2584            1 :         return mem, entry
    2585              : }
    2586              : 
    2587            1 : func (d *DB) freeMemTable(m *memTable) {
    2588            1 :         d.memTableCount.Add(-1)
    2589            1 :         d.memTableReserved.Add(-int64(m.arenaBuf.Len()))
    2590            1 :         m.free()
    2591            1 : }
    2592              : 
    2593              : func (d *DB) newFlushableEntry(
    2594              :         f flushable, logNum base.DiskFileNum, logSeqNum base.SeqNum,
    2595            1 : ) *flushableEntry {
    2596            1 :         fe := &flushableEntry{
    2597            1 :                 flushable:      f,
    2598            1 :                 flushed:        make(chan struct{}),
    2599            1 :                 logNum:         logNum,
    2600            1 :                 logSeqNum:      logSeqNum,
    2601            1 :                 deleteFn:       d.mu.versions.addObsolete,
    2602            1 :                 deleteFnLocked: d.mu.versions.addObsoleteLocked,
    2603            1 :         }
    2604            1 :         fe.readerRefs.Store(1)
    2605            1 :         return fe
    2606            1 : }
    2607              : 
    2608              : // maybeInduceWriteStall is called before performing a memtable rotation in
    2609              : // makeRoomForWrite. In some conditions, we prefer to stall the user's write
    2610              : // workload rather than continuing to accept writes that may result in resource
    2611              : // exhaustion or prohibitively slow reads.
    2612              : //
    2613              : // There are a couple reasons we might wait to rotate the memtable and
    2614              : // instead induce a write stall:
    2615              : //  1. If too many memtables have queued, we wait for a flush to finish before
    2616              : //     creating another memtable.
    2617              : //  2. If L0 read amplification has grown too high, we wait for compactions
    2618              : //     to reduce the read amplification before accepting more writes that will
    2619              : //     increase write pressure.
    2620              : //
    2621              : // maybeInduceWriteStall checks these stall conditions, and if present, waits
    2622              : // for them to abate.
    2623            1 : func (d *DB) maybeInduceWriteStall(b *Batch) {
    2624            1 :         stalled := false
    2625            1 :         // This function will call EventListener.WriteStallBegin at most once.  If
    2626            1 :         // it does call it, it will call EventListener.WriteStallEnd once before
    2627            1 :         // returning.
    2628            1 :         for {
    2629            1 :                 var size uint64
    2630            1 :                 for i := range d.mu.mem.queue {
    2631            1 :                         size += d.mu.mem.queue[i].totalBytes()
    2632            1 :                 }
    2633            1 :                 if size >= uint64(d.opts.MemTableStopWritesThreshold)*d.opts.MemTableSize &&
    2634            1 :                         !d.mu.log.manager.ElevateWriteStallThresholdForFailover() {
    2635            1 :                         // We have filled up the current memtable, but already queued memtables
    2636            1 :                         // are still flushing, so we wait.
    2637            1 :                         if !stalled {
    2638            1 :                                 stalled = true
    2639            1 :                                 d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
    2640            1 :                                         Reason: "memtable count limit reached",
    2641            1 :                                 })
    2642            1 :                         }
    2643            1 :                         beforeWait := crtime.NowMono()
    2644            1 :                         d.mu.compact.cond.Wait()
    2645            1 :                         if b != nil {
    2646            1 :                                 b.commitStats.MemTableWriteStallDuration += beforeWait.Elapsed()
    2647            1 :                         }
    2648            1 :                         continue
    2649              :                 }
    2650            1 :                 l0ReadAmp := d.mu.versions.latest.l0Organizer.ReadAmplification()
    2651            1 :                 if l0ReadAmp >= d.opts.L0StopWritesThreshold {
    2652            1 :                         // There are too many level-0 files, so we wait.
    2653            1 :                         if !stalled {
    2654            1 :                                 stalled = true
    2655            1 :                                 d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
    2656            1 :                                         Reason: "L0 file count limit exceeded",
    2657            1 :                                 })
    2658            1 :                         }
    2659            1 :                         beforeWait := crtime.NowMono()
    2660            1 :                         d.mu.compact.cond.Wait()
    2661            1 :                         if b != nil {
    2662            0 :                                 b.commitStats.L0ReadAmpWriteStallDuration += beforeWait.Elapsed()
    2663            0 :                         }
    2664            1 :                         continue
    2665              :                 }
    2666              :                 // Not stalled.
    2667            1 :                 if stalled {
    2668            1 :                         d.opts.EventListener.WriteStallEnd()
    2669            1 :                 }
    2670            1 :                 return
    2671              :         }
    2672              : }
    2673              : 
    2674              : // makeRoomForWrite rotates the current mutable memtable, ensuring that the
    2675              : // resulting mutable memtable has room to hold the contents of the provided
    2676              : // Batch. The current memtable is rotated (marked as immutable) and a new
    2677              : // mutable memtable is allocated. It reserves space in the new memtable and adds
    2678              : // a reference to the memtable. The caller must later ensure that the memtable
    2679              : // is unreferenced. This memtable rotation also causes a log rotation.
    2680              : //
    2681              : // If the current memtable is not full but the caller wishes to trigger a
    2682              : // rotation regardless, the caller may pass a nil Batch, and no space in the
    2683              : // resulting mutable memtable will be reserved.
    2684              : //
    2685              : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
    2686              : // may be released and reacquired.
    2687            1 : func (d *DB) makeRoomForWrite(b *Batch) error {
    2688            1 :         if b != nil && b.ingestedSSTBatch {
    2689            0 :                 panic("pebble: invalid function call")
    2690              :         }
    2691            1 :         d.maybeInduceWriteStall(b)
    2692            1 : 
    2693            1 :         var newLogNum base.DiskFileNum
    2694            1 :         var prevLogSize uint64
    2695            1 :         if !d.opts.DisableWAL {
    2696            1 :                 beforeRotate := crtime.NowMono()
    2697            1 :                 newLogNum, prevLogSize = d.rotateWAL()
    2698            1 :                 if b != nil {
    2699            1 :                         b.commitStats.WALRotationDuration += beforeRotate.Elapsed()
    2700            1 :                 }
    2701              :         }
    2702            1 :         immMem := d.mu.mem.mutable
    2703            1 :         imm := d.mu.mem.queue[len(d.mu.mem.queue)-1]
    2704            1 :         imm.logSize = prevLogSize
    2705            1 : 
    2706            1 :         var logSeqNum base.SeqNum
    2707            1 :         var minSize uint64
    2708            1 :         if b != nil {
    2709            1 :                 logSeqNum = b.SeqNum()
    2710            1 :                 if b.flushable != nil {
    2711            1 :                         logSeqNum += base.SeqNum(b.Count())
    2712            1 :                         // The batch is too large to fit in the memtable so add it directly to
    2713            1 :                         // the immutable queue. The flushable batch is associated with the same
    2714            1 :                         // log as the immutable memtable, but logically occurs after it in
    2715            1 :                         // seqnum space. We ensure while flushing that the flushable batch
    2716            1 :                         // is flushed along with the previous memtable in the flushable
    2717            1 :                         // queue. See the top level comment in DB.flush1 to learn how this
    2718            1 :                         // is ensured.
    2719            1 :                         //
    2720            1 :                         // See DB.commitWrite for the special handling of log writes for large
    2721            1 :                         // batches. In particular, the large batch has already written to
    2722            1 :                         // imm.logNum.
    2723            1 :                         entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
    2724            1 :                         // The large batch is by definition large. Reserve space from the cache
    2725            1 :                         // for it until it is flushed.
    2726            1 :                         entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
    2727            1 :                         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    2728            1 :                 } else {
    2729            1 :                         minSize = b.memTableSize
    2730            1 :                 }
    2731            1 :         } else {
    2732            1 :                 // b == nil
    2733            1 :                 //
    2734            1 :                 // This is a manual forced flush.
    2735            1 :                 logSeqNum = base.SeqNum(d.mu.versions.logSeqNum.Load())
    2736            1 :                 imm.flushForced = true
    2737            1 :                 // If we are manually flushing and we used less than half of the bytes in
    2738            1 :                 // the memtable, don't increase the size for the next memtable. This
    2739            1 :                 // reduces memtable memory pressure when an application is frequently
    2740            1 :                 // manually flushing.
    2741            1 :                 if uint64(immMem.availBytes()) > immMem.totalBytes()/2 {
    2742            1 :                         d.mu.mem.nextSize = immMem.totalBytes()
    2743            1 :                 }
    2744              :         }
    2745            1 :         d.rotateMemtable(newLogNum, logSeqNum, immMem, minSize)
    2746            1 :         if b != nil && b.flushable == nil {
    2747            1 :                 err := d.mu.mem.mutable.prepare(b)
    2748            1 :                 // Reserving enough space for the batch after rotation must never fail.
    2749            1 :                 // We pass in a minSize that's equal to b.memtableSize to ensure that
    2750            1 :                 // memtable rotation allocates a memtable sufficiently large. We also
    2751            1 :                 // held d.commit.mu for the entirety of this function, ensuring that no
    2752            1 :                 // other committers may have reserved memory in the new memtable yet.
    2753            1 :                 if err == arenaskl.ErrArenaFull {
    2754            0 :                         panic(errors.AssertionFailedf("memtable still full after rotation"))
    2755              :                 }
    2756            1 :                 return err
    2757              :         }
    2758            1 :         return nil
    2759              : }
    2760              : 
    2761              : // Both DB.mu and commitPipeline.mu must be held by the caller.
    2762              : func (d *DB) rotateMemtable(
    2763              :         newLogNum base.DiskFileNum, logSeqNum base.SeqNum, prev *memTable, minSize uint64,
    2764            1 : ) {
    2765            1 :         // Create a new memtable, scheduling the previous one for flushing. We do
    2766            1 :         // this even if the previous memtable was empty because the DB.Flush
    2767            1 :         // mechanism is dependent on being able to wait for the empty memtable to
    2768            1 :         // flush. We can't just mark the empty memtable as flushed here because we
    2769            1 :         // also have to wait for all previous immutable tables to
    2770            1 :         // flush. Additionally, the memtable is tied to particular WAL file and we
    2771            1 :         // want to go through the flush path in order to recycle that WAL file.
    2772            1 :         //
    2773            1 :         // NB: newLogNum corresponds to the WAL that contains mutations that are
    2774            1 :         // present in the new memtable. When immutable memtables are flushed to
    2775            1 :         // disk, a VersionEdit will be created telling the manifest the minimum
    2776            1 :         // unflushed log number (which will be the next one in d.mu.mem.mutable
    2777            1 :         // that was not flushed).
    2778            1 :         //
    2779            1 :         // NB: prev should be the current mutable memtable.
    2780            1 :         var entry *flushableEntry
    2781            1 :         d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum, minSize)
    2782            1 :         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    2783            1 :         // d.logSize tracks the log size of the WAL file corresponding to the most
    2784            1 :         // recent flushable. The log size of the previous mutable memtable no longer
    2785            1 :         // applies to the current mutable memtable.
    2786            1 :         //
    2787            1 :         // It's tempting to perform this update in rotateWAL, but that would not be
    2788            1 :         // atomic with the enqueue of the new flushable. A call to DB.Metrics()
    2789            1 :         // could acquire DB.mu after the WAL has been rotated but before the new
    2790            1 :         // memtable has been appended; this would result in omitting the log size of
    2791            1 :         // the most recent flushable.
    2792            1 :         d.logSize.Store(0)
    2793            1 :         d.updateReadStateLocked(nil)
    2794            1 :         if prev.writerUnref() {
    2795            1 :                 d.maybeScheduleFlush()
    2796            1 :         }
    2797              : }
    2798              : 
    2799              : // rotateWAL creates a new write-ahead log, possibly recycling a previous WAL's
    2800              : // files. It returns the file number assigned to the new WAL, and the size of
    2801              : // the previous WAL file.
    2802              : //
    2803              : // Both DB.mu and commitPipeline.mu must be held by the caller. Note that DB.mu
    2804              : // may be released and reacquired.
    2805            1 : func (d *DB) rotateWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
    2806            1 :         if d.opts.DisableWAL {
    2807            0 :                 panic("pebble: invalid function call")
    2808              :         }
    2809            1 :         jobID := d.newJobIDLocked()
    2810            1 :         newLogNum = d.mu.versions.getNextDiskFileNum()
    2811            1 : 
    2812            1 :         d.mu.Unlock()
    2813            1 :         // Close the previous log first. This writes an EOF trailer
    2814            1 :         // signifying the end of the file and syncs it to disk. We must
    2815            1 :         // close the previous log before linking the new log file,
    2816            1 :         // otherwise a crash could leave both logs with unclean tails, and
    2817            1 :         // Open will treat the previous log as corrupt.
    2818            1 :         offset, err := d.mu.log.writer.Close()
    2819            1 :         if err != nil {
    2820            0 :                 // What to do here? Stumbling on doesn't seem worthwhile. If we failed to
    2821            0 :                 // close the previous log it is possible we lost a write.
    2822            0 :                 panic(err)
    2823              :         }
    2824            1 :         prevLogSize = uint64(offset)
    2825            1 :         metrics := d.mu.log.writer.Metrics()
    2826            1 : 
    2827            1 :         d.mu.Lock()
    2828            1 :         if err := d.mu.log.metrics.LogWriterMetrics.Merge(&metrics); err != nil {
    2829            0 :                 d.opts.Logger.Errorf("metrics error: %s", err)
    2830            0 :         }
    2831              : 
    2832            1 :         d.mu.Unlock()
    2833            1 :         writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
    2834            1 :         if err != nil {
    2835            0 :                 panic(err)
    2836              :         }
    2837              : 
    2838            1 :         d.mu.Lock()
    2839            1 :         d.mu.log.writer = writer
    2840            1 :         return newLogNum, prevLogSize
    2841              : }
    2842              : 
    2843            1 : func (d *DB) getEarliestUnflushedSeqNumLocked() base.SeqNum {
    2844            1 :         seqNum := base.SeqNumMax
    2845            1 :         for i := range d.mu.mem.queue {
    2846            1 :                 logSeqNum := d.mu.mem.queue[i].logSeqNum
    2847            1 :                 if seqNum > logSeqNum {
    2848            1 :                         seqNum = logSeqNum
    2849            1 :                 }
    2850              :         }
    2851            1 :         return seqNum
    2852              : }
    2853              : 
    2854            1 : func (d *DB) getInProgressCompactionInfoLocked(finishing compaction) (rv []compactionInfo) {
    2855            1 :         for c := range d.mu.compact.inProgress {
    2856            1 :                 if !c.IsFlush() && (finishing == nil || c != finishing) {
    2857            1 :                         rv = append(rv, c.Info())
    2858            1 :                 }
    2859              :         }
    2860            1 :         return
    2861              : }
    2862              : 
    2863            1 : func inProgressL0Compactions(inProgress []compactionInfo) []manifest.L0Compaction {
    2864            1 :         var compactions []manifest.L0Compaction
    2865            1 :         for _, info := range inProgress {
    2866            1 :                 // Skip in-progress compactions that have already committed; the L0
    2867            1 :                 // sublevels initialization code requires the set of in-progress
    2868            1 :                 // compactions to be consistent with the current version. Compactions
    2869            1 :                 // with versionEditApplied=true are already applied to the current
    2870            1 :                 // version and but are performing cleanup without the database mutex.
    2871            1 :                 if info.versionEditApplied {
    2872            1 :                         continue
    2873              :                 }
    2874            1 :                 l0 := false
    2875            1 :                 for _, cl := range info.inputs {
    2876            1 :                         l0 = l0 || cl.level == 0
    2877            1 :                 }
    2878            1 :                 if !l0 {
    2879            1 :                         continue
    2880              :                 }
    2881            1 :                 compactions = append(compactions, manifest.L0Compaction{
    2882            1 :                         Bounds:    *info.bounds,
    2883            1 :                         IsIntraL0: info.outputLevel == 0,
    2884            1 :                 })
    2885              :         }
    2886            1 :         return compactions
    2887              : }
    2888              : 
    2889              : // firstError returns the first non-nil error of err0 and err1, or nil if both
    2890              : // are nil.
    2891            1 : func firstError(err0, err1 error) error {
    2892            1 :         if err0 != nil {
    2893            1 :                 return err0
    2894            1 :         }
    2895            1 :         return err1
    2896              : }
    2897              : 
    2898              : // SetCreatorID sets the CreatorID which is needed in order to use shared objects.
    2899              : // Remote object usage is disabled until this method is called the first time.
    2900              : // Once set, the Creator ID is persisted and cannot change.
    2901              : //
    2902              : // Does nothing if SharedStorage was not set in the options when the DB was
    2903              : // opened or if the DB is in read-only mode.
    2904            0 : func (d *DB) SetCreatorID(creatorID uint64) error {
    2905            0 :         if d.opts.Experimental.RemoteStorage == nil || d.opts.ReadOnly {
    2906            0 :                 return nil
    2907            0 :         }
    2908            0 :         return d.objProvider.SetCreatorID(objstorage.CreatorID(creatorID))
    2909              : }
    2910              : 
    2911              : // KeyStatistics keeps track of the number of keys that have been pinned by a
    2912              : // snapshot as well as counts of the different key kinds in the lsm.
    2913              : //
    2914              : // One way of using the accumulated stats, when we only have sets and dels,
    2915              : // and say the counts are represented as del_count, set_count,
    2916              : // del_latest_count, set_latest_count, snapshot_pinned_count.
    2917              : //
    2918              : //   - del_latest_count + set_latest_count is the set of unique user keys
    2919              : //     (unique).
    2920              : //
    2921              : //   - set_latest_count is the set of live unique user keys (live_unique).
    2922              : //
    2923              : //   - Garbage is del_count + set_count - live_unique.
    2924              : //
    2925              : //   - If everything were in the LSM, del_count+set_count-snapshot_pinned_count
    2926              : //     would also be the set of unique user keys (note that
    2927              : //     snapshot_pinned_count is counting something different -- see comment below).
    2928              : //     But snapshot_pinned_count only counts keys in the LSM so the excess here
    2929              : //     must be keys in memtables.
    2930              : type KeyStatistics struct {
    2931              :         // TODO(sumeer): the SnapshotPinned* are incorrect in that these older
    2932              :         // versions can be in a different level. Either fix the accounting or
    2933              :         // rename these fields.
    2934              : 
    2935              :         // SnapshotPinnedKeys represents obsolete keys that cannot be elided during
    2936              :         // a compaction, because they are required by an open snapshot.
    2937              :         SnapshotPinnedKeys int
    2938              :         // SnapshotPinnedKeysBytes is the total number of bytes of all snapshot
    2939              :         // pinned keys.
    2940              :         SnapshotPinnedKeysBytes uint64
    2941              :         // KindsCount is the count for each kind of key. It includes point keys,
    2942              :         // range deletes and range keys.
    2943              :         KindsCount [InternalKeyKindMax + 1]int
    2944              :         // LatestKindsCount is the count for each kind of key when it is the latest
    2945              :         // kind for a user key. It is only populated for point keys.
    2946              :         LatestKindsCount [InternalKeyKindMax + 1]int
    2947              : }
    2948              : 
    2949              : // LSMKeyStatistics is used by DB.ScanStatistics.
    2950              : type LSMKeyStatistics struct {
    2951              :         Accumulated KeyStatistics
    2952              :         // Levels contains statistics only for point keys. Range deletions and range keys will
    2953              :         // appear in Accumulated but not Levels.
    2954              :         Levels [numLevels]KeyStatistics
    2955              :         // BytesRead represents the logical, pre-compression size of keys and values read
    2956              :         BytesRead uint64
    2957              : }
    2958              : 
    2959              : // ScanStatisticsOptions is used by DB.ScanStatistics.
    2960              : type ScanStatisticsOptions struct {
    2961              :         // LimitBytesPerSecond indicates the number of bytes that are able to be read
    2962              :         // per second using ScanInternal.
    2963              :         // A value of 0 indicates that there is no limit set.
    2964              :         LimitBytesPerSecond int64
    2965              : }
    2966              : 
    2967              : // ScanStatistics returns the count of different key kinds within the lsm for a
    2968              : // key span [lower, upper) as well as the number of snapshot keys.
    2969              : func (d *DB) ScanStatistics(
    2970              :         ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
    2971            0 : ) (LSMKeyStatistics, error) {
    2972            0 :         stats := LSMKeyStatistics{}
    2973            0 :         var prevKey InternalKey
    2974            0 :         var rateLimitFunc func(key *InternalKey, val LazyValue) error
    2975            0 :         tb := tokenbucket.TokenBucket{}
    2976            0 : 
    2977            0 :         if opts.LimitBytesPerSecond != 0 {
    2978            0 :                 const minBytesPerSec = 100 * 1024
    2979            0 :                 if opts.LimitBytesPerSecond < minBytesPerSec {
    2980            0 :                         return stats, errors.Newf("pebble: ScanStatistics read bandwidth limit %d is below minimum %d", opts.LimitBytesPerSecond, minBytesPerSec)
    2981            0 :                 }
    2982              :                 // Each "token" roughly corresponds to a byte that was read.
    2983            0 :                 tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
    2984            0 :                 rateLimitFunc = func(key *InternalKey, val LazyValue) error {
    2985            0 :                         return tb.WaitCtx(ctx, tokenbucket.Tokens(key.Size()+val.Len()))
    2986            0 :                 }
    2987              :         }
    2988              : 
    2989            0 :         scanInternalOpts := &scanInternalOptions{
    2990            0 :                 visitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
    2991            0 :                         // If the previous key is equal to the current point key, the current key was
    2992            0 :                         // pinned by a snapshot.
    2993            0 :                         size := uint64(key.Size())
    2994            0 :                         kind := key.Kind()
    2995            0 :                         sameKey := d.equal(prevKey.UserKey, key.UserKey)
    2996            0 :                         if iterInfo.Kind == IteratorLevelLSM && sameKey {
    2997            0 :                                 stats.Levels[iterInfo.Level].SnapshotPinnedKeys++
    2998            0 :                                 stats.Levels[iterInfo.Level].SnapshotPinnedKeysBytes += size
    2999            0 :                                 stats.Accumulated.SnapshotPinnedKeys++
    3000            0 :                                 stats.Accumulated.SnapshotPinnedKeysBytes += size
    3001            0 :                         }
    3002            0 :                         if iterInfo.Kind == IteratorLevelLSM {
    3003            0 :                                 stats.Levels[iterInfo.Level].KindsCount[kind]++
    3004            0 :                         }
    3005            0 :                         if !sameKey {
    3006            0 :                                 if iterInfo.Kind == IteratorLevelLSM {
    3007            0 :                                         stats.Levels[iterInfo.Level].LatestKindsCount[kind]++
    3008            0 :                                 }
    3009            0 :                                 stats.Accumulated.LatestKindsCount[kind]++
    3010              :                         }
    3011              : 
    3012            0 :                         stats.Accumulated.KindsCount[kind]++
    3013            0 :                         prevKey.CopyFrom(*key)
    3014            0 :                         stats.BytesRead += uint64(key.Size() + value.Len())
    3015            0 :                         return nil
    3016              :                 },
    3017            0 :                 visitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
    3018            0 :                         stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
    3019            0 :                         stats.BytesRead += uint64(len(start) + len(end))
    3020            0 :                         return nil
    3021            0 :                 },
    3022            0 :                 visitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
    3023            0 :                         stats.BytesRead += uint64(len(start) + len(end))
    3024            0 :                         for _, key := range keys {
    3025            0 :                                 stats.Accumulated.KindsCount[key.Kind()]++
    3026            0 :                                 stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
    3027            0 :                         }
    3028            0 :                         return nil
    3029              :                 },
    3030              :                 includeObsoleteKeys: true,
    3031              :                 IterOptions: IterOptions{
    3032              :                         KeyTypes:   IterKeyTypePointsAndRanges,
    3033              :                         LowerBound: lower,
    3034              :                         UpperBound: upper,
    3035              :                 },
    3036              :                 rateLimitFunc: rateLimitFunc,
    3037              :         }
    3038            0 :         iter, err := d.newInternalIter(ctx, snapshotIterOpts{}, scanInternalOpts)
    3039            0 :         if err != nil {
    3040            0 :                 return LSMKeyStatistics{}, err
    3041            0 :         }
    3042            0 :         defer iter.close()
    3043            0 : 
    3044            0 :         err = scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
    3045            0 : 
    3046            0 :         if err != nil {
    3047            0 :                 return LSMKeyStatistics{}, err
    3048            0 :         }
    3049              : 
    3050            0 :         return stats, nil
    3051              : }
    3052              : 
    3053              : // ObjProvider returns the objstorage.Provider for this database. Meant to be
    3054              : // used for internal purposes only.
    3055            1 : func (d *DB) ObjProvider() objstorage.Provider {
    3056            1 :         return d.objProvider
    3057            1 : }
    3058              : 
    3059            0 : func (d *DB) checkVirtualBounds(m *manifest.TableMetadata) {
    3060            0 :         if !invariants.Enabled {
    3061            0 :                 return
    3062            0 :         }
    3063              : 
    3064            0 :         objMeta, err := d.objProvider.Lookup(base.FileTypeTable, m.TableBacking.DiskFileNum)
    3065            0 :         if err != nil {
    3066            0 :                 panic(err)
    3067              :         }
    3068            0 :         if objMeta.IsExternal() {
    3069            0 :                 // Nothing to do; bounds are expected to be loose.
    3070            0 :                 return
    3071            0 :         }
    3072              : 
    3073            0 :         iters, err := d.newIters(context.TODO(), m, nil, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
    3074            0 :         if err != nil {
    3075            0 :                 panic(errors.Wrap(err, "pebble: error creating iterators"))
    3076              :         }
    3077            0 :         defer func() { _ = iters.CloseAll() }()
    3078              : 
    3079            0 :         if m.HasPointKeys {
    3080            0 :                 pointIter := iters.Point()
    3081            0 :                 rangeDelIter := iters.RangeDeletion()
    3082            0 : 
    3083            0 :                 // Check that the lower bound is tight.
    3084            0 :                 pointKV := pointIter.First()
    3085            0 :                 rangeDel, err := rangeDelIter.First()
    3086            0 :                 if err != nil {
    3087            0 :                         panic(err)
    3088              :                 }
    3089            0 :                 if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.PointKeyBounds.Smallest().UserKey) != 0) &&
    3090            0 :                         (pointKV == nil || d.cmp(pointKV.K.UserKey, m.PointKeyBounds.Smallest().UserKey) != 0) {
    3091            0 :                         panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.TableNum))
    3092              :                 }
    3093              : 
    3094              :                 // Check that the upper bound is tight.
    3095            0 :                 pointKV = pointIter.Last()
    3096            0 :                 rangeDel, err = rangeDelIter.Last()
    3097            0 :                 if err != nil {
    3098            0 :                         panic(err)
    3099              :                 }
    3100            0 :                 if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.PointKeyBounds.LargestUserKey()) != 0) &&
    3101            0 :                         (pointKV == nil || d.cmp(pointKV.K.UserKey, m.PointKeyBounds.Largest().UserKey) != 0) {
    3102            0 :                         panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.TableNum))
    3103              :                 }
    3104              : 
    3105              :                 // Check that iterator keys are within bounds.
    3106            0 :                 for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
    3107            0 :                         if d.cmp(kv.K.UserKey, m.PointKeyBounds.Smallest().UserKey) < 0 || d.cmp(kv.K.UserKey, m.PointKeyBounds.LargestUserKey()) > 0 {
    3108            0 :                                 panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, kv.K.UserKey))
    3109              :                         }
    3110              :                 }
    3111            0 :                 s, err := rangeDelIter.First()
    3112            0 :                 for ; s != nil; s, err = rangeDelIter.Next() {
    3113            0 :                         if d.cmp(s.SmallestKey().UserKey, m.PointKeyBounds.Smallest().UserKey) < 0 {
    3114            0 :                                 panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, s.SmallestKey().UserKey))
    3115              :                         }
    3116            0 :                         if d.cmp(s.LargestKey().UserKey, m.PointKeyBounds.Largest().UserKey) > 0 {
    3117            0 :                                 panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, s.LargestKey().UserKey))
    3118              :                         }
    3119              :                 }
    3120            0 :                 if err != nil {
    3121            0 :                         panic(err)
    3122              :                 }
    3123              :         }
    3124              : 
    3125            0 :         if !m.HasRangeKeys {
    3126            0 :                 return
    3127            0 :         }
    3128            0 :         rangeKeyIter := iters.RangeKey()
    3129            0 : 
    3130            0 :         // Check that the lower bound is tight.
    3131            0 :         if s, err := rangeKeyIter.First(); err != nil {
    3132            0 :                 panic(err)
    3133            0 :         } else if m.HasRangeKeys && d.cmp(s.SmallestKey().UserKey, m.RangeKeyBounds.SmallestUserKey()) != 0 {
    3134            0 :                 panic(errors.Newf("pebble: virtual sstable %s lower range key bound is not tight", m.TableNum))
    3135              :         }
    3136              : 
    3137              :         // Check that upper bound is tight.
    3138            0 :         if s, err := rangeKeyIter.Last(); err != nil {
    3139            0 :                 panic(err)
    3140            0 :         } else if d.cmp(s.LargestKey().UserKey, m.RangeKeyBounds.LargestUserKey()) != 0 {
    3141            0 :                 panic(errors.Newf("pebble: virtual sstable %s upper range key bound is not tight", m.TableNum))
    3142              :         }
    3143              : 
    3144            0 :         s, err := rangeKeyIter.First()
    3145            0 :         for ; s != nil; s, err = rangeKeyIter.Next() {
    3146            0 :                 if d.cmp(s.SmallestKey().UserKey, m.RangeKeyBounds.SmallestUserKey()) < 0 {
    3147            0 :                         panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, s.SmallestKey().UserKey))
    3148              :                 }
    3149            0 :                 if d.cmp(s.LargestKey().UserKey, m.RangeKeyBounds.LargestUserKey()) > 0 {
    3150            0 :                         panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.TableNum, s.LargestKey().UserKey))
    3151              :                 }
    3152              :         }
    3153            0 :         if err != nil {
    3154            0 :                 panic(err)
    3155              :         }
    3156              : }
    3157              : 
    3158              : // DebugString returns a debugging string describing the LSM.
    3159            0 : func (d *DB) DebugString() string {
    3160            0 :         return d.DebugCurrentVersion().DebugString()
    3161            0 : }
    3162              : 
    3163              : // DebugCurrentVersion returns the current LSM tree metadata. Should only be
    3164              : // used for testing/debugging.
    3165            0 : func (d *DB) DebugCurrentVersion() *manifest.Version {
    3166            0 :         d.mu.Lock()
    3167            0 :         defer d.mu.Unlock()
    3168            0 :         return d.mu.versions.currentVersion()
    3169            0 : }
        

Generated by: LCOV version 2.0-1