LCOV - code coverage report
Current view: top level - pebble - batch.go (source / functions) Hit Total Coverage
Test: 2024-12-09 08:17Z 0f9bf63d - tests + meta.lcov Lines: 1217 1401 86.9 %
Date: 2024-12-09 08:18:29 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "math"
      14             :         "sort"
      15             :         "sync"
      16             :         "sync/atomic"
      17             :         "time"
      18             :         "unsafe"
      19             : 
      20             :         "github.com/cockroachdb/crlib/crtime"
      21             :         "github.com/cockroachdb/errors"
      22             :         "github.com/cockroachdb/pebble/batchrepr"
      23             :         "github.com/cockroachdb/pebble/internal/base"
      24             :         "github.com/cockroachdb/pebble/internal/batchskl"
      25             :         "github.com/cockroachdb/pebble/internal/humanize"
      26             :         "github.com/cockroachdb/pebble/internal/invariants"
      27             :         "github.com/cockroachdb/pebble/internal/keyspan"
      28             :         "github.com/cockroachdb/pebble/internal/private"
      29             :         "github.com/cockroachdb/pebble/internal/rangedel"
      30             :         "github.com/cockroachdb/pebble/internal/rangekey"
      31             :         "github.com/cockroachdb/pebble/internal/rawalloc"
      32             :         "github.com/cockroachdb/pebble/internal/treeprinter"
      33             : )
      34             : 
      35             : const (
      36             :         invalidBatchCount = 1<<32 - 1
      37             :         maxVarintLen32    = 5
      38             : 
      39             :         defaultBatchInitialSize     = 1 << 10 // 1 KB
      40             :         defaultBatchMaxRetainedSize = 1 << 20 // 1 MB
      41             : )
      42             : 
      43             : // ErrNotIndexed means that a read operation on a batch failed because the
      44             : // batch is not indexed and thus doesn't support reads.
      45             : var ErrNotIndexed = errors.New("pebble: batch not indexed")
      46             : 
      47             : // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
      48             : var ErrInvalidBatch = batchrepr.ErrInvalidBatch
      49             : 
      50             : // ErrBatchTooLarge indicates that a batch is invalid or otherwise corrupted.
      51             : var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize)))
      52             : 
      53             : // DeferredBatchOp represents a batch operation (eg. set, merge, delete) that is
      54             : // being inserted into the batch. Indexing is not performed on the specified key
      55             : // until Finish is called, hence the name deferred. This struct lets the caller
      56             : // copy or encode keys/values directly into the batch representation instead of
      57             : // copying into an intermediary buffer then having pebble.Batch copy off of it.
      58             : type DeferredBatchOp struct {
      59             :         index *batchskl.Skiplist
      60             : 
      61             :         // Key and Value point to parts of the binary batch representation where
      62             :         // keys and values should be encoded/copied into. len(Key) and len(Value)
      63             :         // bytes must be copied into these slices respectively before calling
      64             :         // Finish(). Changing where these slices point to is not allowed.
      65             :         Key, Value []byte
      66             :         offset     uint32
      67             : }
      68             : 
      69             : // Finish completes the addition of this batch operation, and adds it to the
      70             : // index if necessary. Must be called once (and exactly once) keys/values
      71             : // have been filled into Key and Value. Not calling Finish or not
      72             : // copying/encoding keys will result in an incomplete index, and calling Finish
      73             : // twice may result in a panic.
      74           1 : func (d DeferredBatchOp) Finish() error {
      75           1 :         if d.index != nil {
      76           0 :                 if err := d.index.Add(d.offset); err != nil {
      77           0 :                         return err
      78           0 :                 }
      79             :         }
      80           1 :         return nil
      81             : }
      82             : 
      83             : // A Batch is a sequence of Sets, Merges, Deletes, DeleteRanges, RangeKeySets,
      84             : // RangeKeyUnsets, and/or RangeKeyDeletes that are applied atomically. Batch
      85             : // implements the Reader interface, but only an indexed batch supports reading
      86             : // (without error) via Get or NewIter. A non-indexed batch will return
      87             : // ErrNotIndexed when read from. A batch is not safe for concurrent use, and
      88             : // consumers should use a batch per goroutine or provide their own
      89             : // synchronization.
      90             : //
      91             : // # Indexing
      92             : //
      93             : // Batches can be optionally indexed (see DB.NewIndexedBatch). An indexed batch
      94             : // allows iteration via an Iterator (see Batch.NewIter). The iterator provides
      95             : // a merged view of the operations in the batch and the underlying
      96             : // database. This is implemented by treating the batch as an additional layer
      97             : // in the LSM where every entry in the batch is considered newer than any entry
      98             : // in the underlying database (batch entries have the InternalKeySeqNumBatch
      99             : // bit set). By treating the batch as an additional layer in the LSM, iteration
     100             : // supports all batch operations (i.e. Set, Merge, Delete, DeleteRange,
     101             : // RangeKeySet, RangeKeyUnset, RangeKeyDelete) with minimal effort.
     102             : //
     103             : // The same key can be operated on multiple times in a batch, though only the
     104             : // latest operation will be visible. For example, Put("a", "b"), Delete("a")
     105             : // will cause the key "a" to not be visible in the batch. Put("a", "b"),
     106             : // Put("a", "c") will cause a read of "a" to return the value "c".
     107             : //
     108             : // The batch index is implemented via an skiplist (internal/batchskl). While
     109             : // the skiplist implementation is very fast, inserting into an indexed batch is
     110             : // significantly slower than inserting into a non-indexed batch. Only use an
     111             : // indexed batch if you require reading from it.
     112             : //
     113             : // # Atomic commit
     114             : //
     115             : // The operations in a batch are persisted by calling Batch.Commit which is
     116             : // equivalent to calling DB.Apply(batch). A batch is committed atomically by
     117             : // writing the internal batch representation to the WAL, adding all of the
     118             : // batch operations to the memtable associated with the WAL, and then
     119             : // incrementing the visible sequence number so that subsequent reads can see
     120             : // the effects of the batch operations. If WriteOptions.Sync is true, a call to
     121             : // Batch.Commit will guarantee that the batch is persisted to disk before
     122             : // returning. See commitPipeline for more on the implementation details.
     123             : //
     124             : // # Large batches
     125             : //
     126             : // The size of a batch is limited only by available memory (be aware that
     127             : // indexed batches require considerably additional memory for the skiplist
     128             : // structure). A given WAL file has a single memtable associated with it (this
     129             : // restriction could be removed, but doing so is onerous and complex). And a
     130             : // memtable has a fixed size due to the underlying fixed size arena. Note that
     131             : // this differs from RocksDB where a memtable can grow arbitrarily large using
     132             : // a list of arena chunks. In RocksDB this is accomplished by storing pointers
     133             : // in the arena memory, but that isn't possible in Go.
     134             : //
     135             : // During Batch.Commit, a batch which is larger than a threshold (>
     136             : // MemTableSize/2) is wrapped in a flushableBatch and inserted into the queue
     137             : // of memtables. A flushableBatch forces WAL to be rotated, but that happens
     138             : // anyways when the memtable becomes full so this does not cause significant
     139             : // WAL churn. Because the flushableBatch is readable as another layer in the
     140             : // LSM, Batch.Commit returns as soon as the flushableBatch has been added to
     141             : // the queue of memtables.
     142             : //
     143             : // Internally, a flushableBatch provides Iterator support by sorting the batch
     144             : // contents (the batch is sorted once, when it is added to the memtable
     145             : // queue). Sorting the batch contents and insertion of the contents into a
     146             : // memtable have the same big-O time, but the constant factor dominates
     147             : // here. Sorting is significantly faster and uses significantly less memory.
     148             : //
     149             : // # Internal representation
     150             : //
     151             : // The internal batch representation is a contiguous byte buffer with a fixed
     152             : // 12-byte header, followed by a series of records.
     153             : //
     154             : //      +-------------+------------+--- ... ---+
     155             : //      | SeqNum (8B) | Count (4B) |  Entries  |
     156             : //      +-------------+------------+--- ... ---+
     157             : //
     158             : // Each record has a 1-byte kind tag prefix, followed by 1 or 2 length prefixed
     159             : // strings (varstring):
     160             : //
     161             : //      +-----------+-----------------+-------------------+
     162             : //      | Kind (1B) | Key (varstring) | Value (varstring) |
     163             : //      +-----------+-----------------+-------------------+
     164             : //
     165             : // A varstring is a varint32 followed by N bytes of data. The Kind tags are
     166             : // exactly those specified by InternalKeyKind. The following table shows the
     167             : // format for records of each kind:
     168             : //
     169             : //      InternalKeyKindDelete         varstring
     170             : //      InternalKeyKindLogData        varstring
     171             : //      InternalKeyKindIngestSST      varstring
     172             : //      InternalKeyKindSet            varstring varstring
     173             : //      InternalKeyKindMerge          varstring varstring
     174             : //      InternalKeyKindRangeDelete    varstring varstring
     175             : //      InternalKeyKindRangeKeySet    varstring varstring
     176             : //      InternalKeyKindRangeKeyUnset  varstring varstring
     177             : //      InternalKeyKindRangeKeyDelete varstring varstring
     178             : //
     179             : // The intuitive understanding here are that the arguments to Delete, Set,
     180             : // Merge, DeleteRange and RangeKeyDelete are encoded into the batch. The
     181             : // RangeKeySet and RangeKeyUnset operations are slightly more complicated,
     182             : // encoding their end key, suffix and value [in the case of RangeKeySet] within
     183             : // the Value varstring. For more information on the value encoding for
     184             : // RangeKeySet and RangeKeyUnset, see the internal/rangekey package.
     185             : //
     186             : // The internal batch representation is the on disk format for a batch in the
     187             : // WAL, and thus stable. New record kinds may be added, but the existing ones
     188             : // will not be modified.
     189             : type Batch struct {
     190             :         batchInternal
     191             :         applied atomic.Bool
     192             :         // lifecycle is used to negotiate the lifecycle of a Batch. A Batch and its
     193             :         // underlying batchInternal.data byte slice may be reused. There are two
     194             :         // mechanisms for reuse:
     195             :         //
     196             :         // 1. The caller may explicitly call [Batch.Reset] to reset the batch to be
     197             :         //    empty (while retaining the underlying repr's buffer).
     198             :         // 2. The caller may call [Batch.Close], passing ownership off to Pebble,
     199             :         //    which may reuse the batch's memory to service new callers to
     200             :         //    [DB.NewBatch].
     201             :         //
     202             :         // There's a complication to reuse: When WAL failover is configured, the
     203             :         // Pebble commit pipeline may retain a pointer to the batch.data beyond the
     204             :         // return of [Batch.Commit]. The user of the Batch may commit their batch
     205             :         // and call Close or Reset before the commit pipeline is finished reading
     206             :         // the data slice. Recycling immediately would cause a data race.
     207             :         //
     208             :         // To resolve this data race, this [lifecycle] atomic is used to determine
     209             :         // safety and responsibility of reusing a batch. The low bits of the atomic
     210             :         // are used as a reference count (really just the lowest bit—in practice
     211             :         // there's only 1 code path that references). The [Batch] is passed into
     212             :         // [wal.Writer]'s WriteRecord method as a [RefCount] implementation. The
     213             :         // wal.Writer guarantees that if it will read [Batch.data] after the call to
     214             :         // WriteRecord returns, it will increment the reference count. When it's
     215             :         // complete, it'll unreference through invoking [Batch.Unref].
     216             :         //
     217             :         // When the committer of a batch indicates intent to recycle a Batch through
     218             :         // calling [Batch.Reset] or [Batch.Close], the lifecycle atomic is read. If
     219             :         // an outstanding reference remains, it's unsafe to reuse Batch.data yet. In
     220             :         // [Batch.Reset] the caller wants to reuse the [Batch] immediately, so we
     221             :         // discard b.data to recycle the struct but not the underlying byte slice.
     222             :         // In [Batch.Close], we set a special high bit [batchClosedBit] on lifecycle
     223             :         // that indicates that the user will not use [Batch] again and we're free to
     224             :         // recycle it when safe. When the commit pipeline eventually calls
     225             :         // [Batch.Unref], the [batchClosedBit] is noticed and the batch is
     226             :         // recycled.
     227             :         lifecycle atomic.Int32
     228             : }
     229             : 
     230             : // batchClosedBit is a bit stored on Batch.lifecycle to indicate that the user
     231             : // called [Batch.Close] to release a Batch, but an open reference count
     232             : // prevented immediate recycling.
     233             : const batchClosedBit = 1 << 30
     234             : 
     235             : // TODO(jackson): Hide the wal.RefCount implementation from the public Batch interface.
     236             : 
     237             : // Ref implements wal.RefCount. If the WAL writer may need to read b.data after
     238             : // it returns, it invokes Ref to increment the lifecycle's reference count. When
     239             : // it's finished, it invokes Unref.
     240           2 : func (b *Batch) Ref() {
     241           2 :         b.lifecycle.Add(+1)
     242           2 : }
     243             : 
     244             : // Unref implemets wal.RefCount.
     245           2 : func (b *Batch) Unref() {
     246           2 :         if v := b.lifecycle.Add(-1); (v ^ batchClosedBit) == 0 {
     247           2 :                 // The [batchClosedBit] high bit is set, and there are no outstanding
     248           2 :                 // references. The user of the Batch called [Batch.Close], expecting the
     249           2 :                 // batch to be recycled. However, our outstanding reference count
     250           2 :                 // prevented recycling. As the last to dereference, we're now
     251           2 :                 // responsible for releasing the batch.
     252           2 :                 b.lifecycle.Store(0)
     253           2 :                 b.release()
     254           2 :         }
     255             : }
     256             : 
     257             : // batchInternal contains the set of fields within Batch that are non-atomic and
     258             : // capable of being reset using a *b = batchInternal{} struct copy.
     259             : type batchInternal struct {
     260             :         // Data is the wire format of a batch's log entry:
     261             :         //   - 8 bytes for a sequence number of the first batch element,
     262             :         //     or zeroes if the batch has not yet been applied,
     263             :         //   - 4 bytes for the count: the number of elements in the batch,
     264             :         //     or "\xff\xff\xff\xff" if the batch is invalid,
     265             :         //   - count elements, being:
     266             :         //     - one byte for the kind
     267             :         //     - the varint-string user key,
     268             :         //     - the varint-string value (if kind != delete).
     269             :         // The sequence number and count are stored in little-endian order.
     270             :         //
     271             :         // The data field can be (but is not guaranteed to be) nil for new
     272             :         // batches. Large batches will set the data field to nil when committed as
     273             :         // the data has been moved to a flushableBatch and inserted into the queue of
     274             :         // memtables.
     275             :         data     []byte
     276             :         comparer *base.Comparer
     277             :         opts     batchOptions
     278             : 
     279             :         // An upper bound on required space to add this batch to a memtable.
     280             :         // Note that although batches are limited to 4 GiB in size, that limit
     281             :         // applies to len(data), not the memtable size. The upper bound on the
     282             :         // size of a memtable node is larger than the overhead of the batch's log
     283             :         // encoding, so memTableSize is larger than len(data) and may overflow a
     284             :         // uint32.
     285             :         memTableSize uint64
     286             : 
     287             :         // The db to which the batch will be committed. Do not change this field
     288             :         // after the batch has been created as it might invalidate internal state.
     289             :         // Batch.memTableSize is only refreshed if Batch.db is set. Setting db to
     290             :         // nil once it has been set implies that the Batch has encountered an error.
     291             :         db *DB
     292             : 
     293             :         // The count of records in the batch. This count will be stored in the batch
     294             :         // data whenever Repr() is called.
     295             :         count uint64
     296             : 
     297             :         // The count of range deletions in the batch. Updated every time a range
     298             :         // deletion is added.
     299             :         countRangeDels uint64
     300             : 
     301             :         // The count of range key sets, unsets and deletes in the batch. Updated
     302             :         // every time a RANGEKEYSET, RANGEKEYUNSET or RANGEKEYDEL key is added.
     303             :         countRangeKeys uint64
     304             : 
     305             :         // A deferredOp struct, stored in the Batch so that a pointer can be returned
     306             :         // from the *Deferred() methods rather than a value.
     307             :         deferredOp DeferredBatchOp
     308             : 
     309             :         // An optional skiplist keyed by offset into data of the entry.
     310             :         index         *batchskl.Skiplist
     311             :         rangeDelIndex *batchskl.Skiplist
     312             :         rangeKeyIndex *batchskl.Skiplist
     313             : 
     314             :         // Fragmented range deletion tombstones. Cached the first time a range
     315             :         // deletion iterator is requested. The cache is invalidated whenever a new
     316             :         // range deletion is added to the batch. This cache can only be used when
     317             :         // opening an iterator to read at a batch sequence number >=
     318             :         // tombstonesSeqNum. This is the case for all new iterators created over a
     319             :         // batch but it's not the case for all cloned iterators.
     320             :         tombstones       []keyspan.Span
     321             :         tombstonesSeqNum base.SeqNum
     322             : 
     323             :         // Fragmented range key spans. Cached the first time a range key iterator is
     324             :         // requested. The cache is invalidated whenever a new range key
     325             :         // (RangeKey{Set,Unset,Del}) is added to the batch. This cache can only be
     326             :         // used when opening an iterator to read at a batch sequence number >=
     327             :         // tombstonesSeqNum. This is the case for all new iterators created over a
     328             :         // batch but it's not the case for all cloned iterators.
     329             :         rangeKeys       []keyspan.Span
     330             :         rangeKeysSeqNum base.SeqNum
     331             : 
     332             :         // The flushableBatch wrapper if the batch is too large to fit in the
     333             :         // memtable.
     334             :         flushable *flushableBatch
     335             : 
     336             :         // minimumFormatMajorVersion indicates the format major version required in
     337             :         // order to commit this batch. If an operation requires a particular format
     338             :         // major version, it ratchets the batch's minimumFormatMajorVersion. When
     339             :         // the batch is committed, this is validated against the database's current
     340             :         // format major version.
     341             :         minimumFormatMajorVersion FormatMajorVersion
     342             : 
     343             :         // Synchronous Apply uses the commit WaitGroup for both publishing the
     344             :         // seqnum and waiting for the WAL fsync (if needed). Asynchronous
     345             :         // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit
     346             :         // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for
     347             :         // waiting for the WAL fsync.
     348             :         //
     349             :         // TODO(sumeer): if we find that ApplyNoSyncWait in conjunction with
     350             :         // SyncWait is causing higher memory usage because of the time duration
     351             :         // between when the sync is already done, and a goroutine calls SyncWait
     352             :         // (followed by Batch.Close), we could separate out {fsyncWait, commitErr}
     353             :         // into a separate struct that is allocated separately (using another
     354             :         // sync.Pool), and only that struct needs to outlive Batch.Close (which
     355             :         // could then be called immediately after ApplyNoSyncWait). commitStats
     356             :         // will also need to be in this separate struct.
     357             :         commit    sync.WaitGroup
     358             :         fsyncWait sync.WaitGroup
     359             : 
     360             :         commitStats BatchCommitStats
     361             : 
     362             :         commitErr error
     363             : 
     364             :         // Position bools together to reduce the sizeof the struct.
     365             : 
     366             :         // ingestedSSTBatch indicates that the batch contains one or more key kinds
     367             :         // of InternalKeyKindIngestSST. If the batch contains key kinds of IngestSST
     368             :         // then it will only contain key kinds of IngestSST.
     369             :         ingestedSSTBatch bool
     370             : 
     371             :         // committing is set to true when a batch begins to commit. It's used to
     372             :         // ensure the batch is not mutated concurrently. It is not an atomic
     373             :         // deliberately, so as to avoid the overhead on batch mutations. This is
     374             :         // okay, because under correct usage this field will never be accessed
     375             :         // concurrently. It's only under incorrect usage the memory accesses of this
     376             :         // variable may violate memory safety. Since we don't use atomics here,
     377             :         // false negatives are possible.
     378             :         committing bool
     379             : }
     380             : 
     381             : // BatchCommitStats exposes stats related to committing a batch.
     382             : //
     383             : // NB: there is no Pebble internal tracing (using LoggerAndTracer) of slow
     384             : // batch commits. The caller can use these stats to do their own tracing as
     385             : // needed.
     386             : type BatchCommitStats struct {
     387             :         // TotalDuration is the time spent in DB.{Apply,ApplyNoSyncWait} or
     388             :         // Batch.Commit, plus the time waiting in Batch.SyncWait. If there is a gap
     389             :         // between calling ApplyNoSyncWait and calling SyncWait, that gap could
     390             :         // include some duration in which real work was being done for the commit
     391             :         // and will not be included here. This missing time is considered acceptable
     392             :         // since the goal of these stats is to understand user-facing latency.
     393             :         //
     394             :         // TotalDuration includes time spent in various queues both inside Pebble
     395             :         // and outside Pebble (I/O queues, goroutine scheduler queue, mutex wait
     396             :         // etc.). For some of these queues (which we consider important) the wait
     397             :         // times are included below -- these expose low-level implementation detail
     398             :         // and are meant for expert diagnosis and subject to change. There may be
     399             :         // unaccounted time after subtracting those values from TotalDuration.
     400             :         TotalDuration time.Duration
     401             :         // SemaphoreWaitDuration is the wait time for semaphores in
     402             :         // commitPipeline.Commit.
     403             :         SemaphoreWaitDuration time.Duration
     404             :         // WALQueueWaitDuration is the wait time for allocating memory blocks in the
     405             :         // LogWriter (due to the LogWriter not writing fast enough). At the moment
     406             :         // this is duration is always zero because a single WAL will allow
     407             :         // allocating memory blocks up to the entire memtable size. In the future,
     408             :         // we may pipeline WALs and bound the WAL queued blocks separately, so this
     409             :         // field is preserved for that possibility.
     410             :         WALQueueWaitDuration time.Duration
     411             :         // MemTableWriteStallDuration is the wait caused by a write stall due to too
     412             :         // many memtables (due to not flushing fast enough).
     413             :         MemTableWriteStallDuration time.Duration
     414             :         // L0ReadAmpWriteStallDuration is the wait caused by a write stall due to
     415             :         // high read amplification in L0 (due to not compacting fast enough out of
     416             :         // L0).
     417             :         L0ReadAmpWriteStallDuration time.Duration
     418             :         // WALRotationDuration is the wait time for WAL rotation, which includes
     419             :         // syncing and closing the old WAL and creating (or reusing) a new one.
     420             :         WALRotationDuration time.Duration
     421             :         // CommitWaitDuration is the wait for publishing the seqnum plus the
     422             :         // duration for the WAL sync (if requested). The former should be tiny and
     423             :         // one can assume that this is all due to the WAL sync.
     424             :         CommitWaitDuration time.Duration
     425             : }
     426             : 
     427             : var _ Reader = (*Batch)(nil)
     428             : var _ Writer = (*Batch)(nil)
     429             : 
     430             : var batchPool = sync.Pool{
     431           2 :         New: func() interface{} {
     432           2 :                 return &Batch{}
     433           2 :         },
     434             : }
     435             : 
     436             : type indexedBatch struct {
     437             :         batch Batch
     438             :         index batchskl.Skiplist
     439             : }
     440             : 
     441             : var indexedBatchPool = sync.Pool{
     442           2 :         New: func() interface{} {
     443           2 :                 return &indexedBatch{}
     444           2 :         },
     445             : }
     446             : 
     447           2 : func newBatch(db *DB, opts ...BatchOption) *Batch {
     448           2 :         b := batchPool.Get().(*Batch)
     449           2 :         b.db = db
     450           2 :         b.opts.ensureDefaults()
     451           2 :         for _, opt := range opts {
     452           1 :                 opt(&b.opts)
     453           1 :         }
     454           2 :         return b
     455             : }
     456             : 
     457           1 : func newBatchWithSize(db *DB, size int, opts ...BatchOption) *Batch {
     458           1 :         b := newBatch(db, opts...)
     459           1 :         if cap(b.data) < size {
     460           1 :                 b.data = rawalloc.New(0, size)
     461           1 :         }
     462           1 :         return b
     463             : }
     464             : 
     465           2 : func newIndexedBatch(db *DB, comparer *Comparer) *Batch {
     466           2 :         i := indexedBatchPool.Get().(*indexedBatch)
     467           2 :         i.batch.comparer = comparer
     468           2 :         i.batch.db = db
     469           2 :         i.batch.index = &i.index
     470           2 :         i.batch.index.Init(&i.batch.data, comparer.Compare, comparer.AbbreviatedKey)
     471           2 :         i.batch.opts.ensureDefaults()
     472           2 :         return &i.batch
     473           2 : }
     474             : 
     475           0 : func newIndexedBatchWithSize(db *DB, comparer *Comparer, size int) *Batch {
     476           0 :         b := newIndexedBatch(db, comparer)
     477           0 :         if cap(b.data) < size {
     478           0 :                 b.data = rawalloc.New(0, size)
     479           0 :         }
     480           0 :         return b
     481             : }
     482             : 
     483             : // nextSeqNum returns the batch "sequence number" that will be given to the next
     484             : // key written to the batch. During iteration keys within an indexed batch are
     485             : // given a sequence number consisting of their offset within the batch combined
     486             : // with the base.SeqNumBatchBit bit. These sequence numbers are only
     487             : // used during iteration, and the keys are assigned ordinary sequence numbers
     488             : // when the batch is committed.
     489           2 : func (b *Batch) nextSeqNum() base.SeqNum {
     490           2 :         return base.SeqNum(len(b.data)) | base.SeqNumBatchBit
     491           2 : }
     492             : 
     493           2 : func (b *Batch) release() {
     494           2 :         if b.db == nil {
     495           2 :                 // The batch was not created using newBatch or newIndexedBatch, or an error
     496           2 :                 // was encountered. We don't try to reuse batches that encountered an error
     497           2 :                 // because they might be stuck somewhere in the system and attempting to
     498           2 :                 // reuse such batches is a recipe for onerous debugging sessions. Instead,
     499           2 :                 // let the GC do its job.
     500           2 :                 return
     501           2 :         }
     502           2 :         b.db = nil
     503           2 : 
     504           2 :         // NB: This is ugly (it would be cleaner if we could just assign a Batch{}),
     505           2 :         // but necessary so that we can use atomic.StoreUint32 for the Batch.applied
     506           2 :         // field. Without using an atomic to clear that field the Go race detector
     507           2 :         // complains.
     508           2 :         b.reset()
     509           2 :         b.comparer = nil
     510           2 : 
     511           2 :         if b.index == nil {
     512           2 :                 batchPool.Put(b)
     513           2 :         } else {
     514           2 :                 b.index, b.rangeDelIndex, b.rangeKeyIndex = nil, nil, nil
     515           2 :                 indexedBatchPool.Put((*indexedBatch)(unsafe.Pointer(b)))
     516           2 :         }
     517             : }
     518             : 
     519           2 : func (b *Batch) refreshMemTableSize() error {
     520           2 :         b.memTableSize = 0
     521           2 :         if len(b.data) < batchrepr.HeaderLen {
     522           1 :                 return nil
     523           1 :         }
     524             : 
     525           2 :         b.countRangeDels = 0
     526           2 :         b.countRangeKeys = 0
     527           2 :         b.minimumFormatMajorVersion = 0
     528           2 :         for r := b.Reader(); ; {
     529           2 :                 kind, key, value, ok, err := r.Next()
     530           2 :                 if !ok {
     531           2 :                         if err != nil {
     532           0 :                                 return err
     533           0 :                         }
     534           2 :                         break
     535             :                 }
     536           2 :                 switch kind {
     537           2 :                 case InternalKeyKindRangeDelete:
     538           2 :                         b.countRangeDels++
     539           2 :                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     540           2 :                         b.countRangeKeys++
     541           2 :                 case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge, InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete:
     542             :                         // fallthrough
     543           2 :                 case InternalKeyKindDeleteSized:
     544           2 :                         if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
     545           2 :                                 b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
     546           2 :                         }
     547           1 :                 case InternalKeyKindLogData:
     548           1 :                         // LogData does not contribute to memtable size.
     549           1 :                         continue
     550           2 :                 case InternalKeyKindIngestSST:
     551           2 :                         if b.minimumFormatMajorVersion < FormatFlushableIngest {
     552           2 :                                 b.minimumFormatMajorVersion = FormatFlushableIngest
     553           2 :                         }
     554             :                         // This key kind doesn't contribute to the memtable size.
     555           2 :                         continue
     556           2 :                 case InternalKeyKindExcise:
     557           2 :                         if b.minimumFormatMajorVersion < FormatFlushableIngestExcises {
     558           2 :                                 b.minimumFormatMajorVersion = FormatFlushableIngestExcises
     559           2 :                         }
     560             :                         // This key kind doesn't contribute to the memtable size.
     561           2 :                         continue
     562           0 :                 default:
     563           0 :                         // Note In some circumstances this might be temporary memory
     564           0 :                         // corruption that can be recovered by discarding the batch and
     565           0 :                         // trying again. In other cases, the batch repr might've been
     566           0 :                         // already persisted elsewhere, and we'll loop continuously trying
     567           0 :                         // to commit the same corrupted batch. The caller is responsible for
     568           0 :                         // distinguishing.
     569           0 :                         return errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
     570             :                 }
     571           2 :                 b.memTableSize += memTableEntrySize(len(key), len(value))
     572             :         }
     573           2 :         return nil
     574             : }
     575             : 
     576             : // Apply the operations contained in the batch to the receiver batch.
     577             : //
     578             : // It is safe to modify the contents of the arguments after Apply returns.
     579             : //
     580             : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
     581           2 : func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
     582           2 :         if b.ingestedSSTBatch {
     583           0 :                 panic("pebble: invalid batch application")
     584             :         }
     585           2 :         if len(batch.data) == 0 {
     586           0 :                 return nil
     587           0 :         }
     588           2 :         if len(batch.data) < batchrepr.HeaderLen {
     589           0 :                 return ErrInvalidBatch
     590           0 :         }
     591             : 
     592           2 :         offset := len(b.data)
     593           2 :         if offset == 0 {
     594           1 :                 b.init(offset)
     595           1 :                 offset = batchrepr.HeaderLen
     596           1 :         }
     597           2 :         b.data = append(b.data, batch.data[batchrepr.HeaderLen:]...)
     598           2 : 
     599           2 :         b.setCount(b.Count() + batch.Count())
     600           2 : 
     601           2 :         if b.db != nil || b.index != nil {
     602           2 :                 // Only iterate over the new entries if we need to track memTableSize or in
     603           2 :                 // order to update the index.
     604           2 :                 for iter := batchrepr.Reader(b.data[offset:]); len(iter) > 0; {
     605           2 :                         offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
     606           2 :                         kind, key, value, ok, err := iter.Next()
     607           2 :                         if !ok {
     608           0 :                                 if err != nil {
     609           0 :                                         return err
     610           0 :                                 }
     611           0 :                                 break
     612             :                         }
     613           2 :                         switch kind {
     614           2 :                         case InternalKeyKindRangeDelete:
     615           2 :                                 b.countRangeDels++
     616           1 :                         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     617           1 :                                 b.countRangeKeys++
     618           0 :                         case InternalKeyKindIngestSST, InternalKeyKindExcise:
     619           0 :                                 panic("pebble: invalid key kind for batch")
     620           1 :                         case InternalKeyKindLogData:
     621           1 :                                 // LogData does not contribute to memtable size.
     622           1 :                                 continue
     623             :                         case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge,
     624           1 :                                 InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
     625             :                                 // fallthrough
     626           0 :                         default:
     627           0 :                                 // Note In some circumstances this might be temporary memory
     628           0 :                                 // corruption that can be recovered by discarding the batch and
     629           0 :                                 // trying again. In other cases, the batch repr might've been
     630           0 :                                 // already persisted elsewhere, and we'll loop continuously
     631           0 :                                 // trying to commit the same corrupted batch. The caller is
     632           0 :                                 // responsible for distinguishing.
     633           0 :                                 return errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
     634             :                         }
     635           2 :                         if b.index != nil {
     636           2 :                                 var err error
     637           2 :                                 switch kind {
     638           2 :                                 case InternalKeyKindRangeDelete:
     639           2 :                                         b.tombstones = nil
     640           2 :                                         b.tombstonesSeqNum = 0
     641           2 :                                         if b.rangeDelIndex == nil {
     642           2 :                                                 b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
     643           2 :                                         }
     644           2 :                                         err = b.rangeDelIndex.Add(uint32(offset))
     645           1 :                                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     646           1 :                                         b.rangeKeys = nil
     647           1 :                                         b.rangeKeysSeqNum = 0
     648           1 :                                         if b.rangeKeyIndex == nil {
     649           1 :                                                 b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
     650           1 :                                         }
     651           1 :                                         err = b.rangeKeyIndex.Add(uint32(offset))
     652           1 :                                 default:
     653           1 :                                         err = b.index.Add(uint32(offset))
     654             :                                 }
     655           2 :                                 if err != nil {
     656           0 :                                         return err
     657           0 :                                 }
     658             :                         }
     659           2 :                         b.memTableSize += memTableEntrySize(len(key), len(value))
     660             :                 }
     661             :         }
     662           2 :         return nil
     663             : }
     664             : 
     665             : // Get gets the value for the given key. It returns ErrNotFound if the Batch
     666             : // does not contain the key.
     667             : //
     668             : // The caller should not modify the contents of the returned slice, but it is
     669             : // safe to modify the contents of the argument after Get returns. The returned
     670             : // slice will remain valid until the returned Closer is closed. On success, the
     671             : // caller MUST call closer.Close() or a memory leak will occur.
     672           2 : func (b *Batch) Get(key []byte) ([]byte, io.Closer, error) {
     673           2 :         if b.index == nil {
     674           0 :                 return nil, nil, ErrNotIndexed
     675           0 :         }
     676           2 :         return b.db.getInternal(key, b, nil /* snapshot */)
     677             : }
     678             : 
     679           2 : func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind InternalKeyKind) {
     680           2 :         if b.committing {
     681           0 :                 panic("pebble: batch already committing")
     682             :         }
     683           2 :         if len(b.data) == 0 {
     684           2 :                 b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchrepr.HeaderLen)
     685           2 :         }
     686           2 :         b.count++
     687           2 :         b.memTableSize += memTableEntrySize(keyLen, valueLen)
     688           2 : 
     689           2 :         pos := len(b.data)
     690           2 :         b.deferredOp.offset = uint32(pos)
     691           2 :         b.grow(1 + 2*maxVarintLen32 + keyLen + valueLen)
     692           2 :         b.data[pos] = byte(kind)
     693           2 :         pos++
     694           2 : 
     695           2 :         {
     696           2 :                 // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
     697           2 :                 // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
     698           2 :                 // versions show this to not be a performance win.
     699           2 :                 x := uint32(keyLen)
     700           2 :                 for x >= 0x80 {
     701           1 :                         b.data[pos] = byte(x) | 0x80
     702           1 :                         x >>= 7
     703           1 :                         pos++
     704           1 :                 }
     705           2 :                 b.data[pos] = byte(x)
     706           2 :                 pos++
     707             :         }
     708             : 
     709           2 :         b.deferredOp.Key = b.data[pos : pos+keyLen]
     710           2 :         pos += keyLen
     711           2 : 
     712           2 :         {
     713           2 :                 // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
     714           2 :                 // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
     715           2 :                 // versions show this to not be a performance win.
     716           2 :                 x := uint32(valueLen)
     717           2 :                 for x >= 0x80 {
     718           1 :                         b.data[pos] = byte(x) | 0x80
     719           1 :                         x >>= 7
     720           1 :                         pos++
     721           1 :                 }
     722           2 :                 b.data[pos] = byte(x)
     723           2 :                 pos++
     724             :         }
     725             : 
     726           2 :         b.deferredOp.Value = b.data[pos : pos+valueLen]
     727           2 :         // Shrink data since varints may be shorter than the upper bound.
     728           2 :         b.data = b.data[:pos+valueLen]
     729             : }
     730             : 
     731           2 : func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) {
     732           2 :         if b.committing {
     733           0 :                 panic("pebble: batch already committing")
     734             :         }
     735           2 :         if len(b.data) == 0 {
     736           2 :                 b.init(keyLen + binary.MaxVarintLen64 + batchrepr.HeaderLen)
     737           2 :         }
     738           2 :         b.count++
     739           2 :         b.memTableSize += memTableEntrySize(keyLen, 0)
     740           2 : 
     741           2 :         pos := len(b.data)
     742           2 :         b.deferredOp.offset = uint32(pos)
     743           2 :         b.grow(1 + maxVarintLen32 + keyLen)
     744           2 :         b.data[pos] = byte(kind)
     745           2 :         pos++
     746           2 : 
     747           2 :         {
     748           2 :                 // TODO(peter): Manually inlined version binary.PutUvarint(). Remove if
     749           2 :                 // go1.13 or future versions show this to not be a performance win. See
     750           2 :                 // BenchmarkBatchSet.
     751           2 :                 x := uint32(keyLen)
     752           2 :                 for x >= 0x80 {
     753           0 :                         b.data[pos] = byte(x) | 0x80
     754           0 :                         x >>= 7
     755           0 :                         pos++
     756           0 :                 }
     757           2 :                 b.data[pos] = byte(x)
     758           2 :                 pos++
     759             :         }
     760             : 
     761           2 :         b.deferredOp.Key = b.data[pos : pos+keyLen]
     762           2 :         b.deferredOp.Value = nil
     763           2 : 
     764           2 :         // Shrink data since varint may be shorter than the upper bound.
     765           2 :         b.data = b.data[:pos+keyLen]
     766             : }
     767             : 
     768             : // AddInternalKey allows the caller to add an internal key of point key or range
     769             : // key kinds (but not RangeDelete) to a batch. Passing in an internal key of
     770             : // kind RangeDelete will result in a panic. Note that the seqnum in the internal
     771             : // key is effectively ignored, even though the Kind is preserved. This is
     772             : // because the batch format does not allow for a per-key seqnum to be specified,
     773             : // only a batch-wide one.
     774             : //
     775             : // Note that non-indexed keys (IngestKeyKind{LogData,IngestSST}) are not
     776             : // supported with this method as they require specialized logic.
     777           2 : func (b *Batch) AddInternalKey(key *base.InternalKey, value []byte, _ *WriteOptions) error {
     778           2 :         keyLen := len(key.UserKey)
     779           2 :         hasValue := false
     780           2 :         switch kind := key.Kind(); kind {
     781           0 :         case InternalKeyKindRangeDelete:
     782           0 :                 panic("unexpected range delete in AddInternalKey")
     783           1 :         case InternalKeyKindSingleDelete, InternalKeyKindDelete:
     784           1 :                 b.prepareDeferredKeyRecord(keyLen, kind)
     785           1 :                 b.deferredOp.index = b.index
     786           2 :         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     787           2 :                 b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
     788           2 :                 hasValue = true
     789           2 :                 b.incrementRangeKeysCount()
     790           1 :         default:
     791           1 :                 b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
     792           1 :                 hasValue = true
     793           1 :                 b.deferredOp.index = b.index
     794             :         }
     795           2 :         copy(b.deferredOp.Key, key.UserKey)
     796           2 :         if hasValue {
     797           2 :                 copy(b.deferredOp.Value, value)
     798           2 :         }
     799             : 
     800             :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     801             :         // in go1.13 will remove the need for this.
     802           2 :         if b.index != nil {
     803           0 :                 if err := b.index.Add(b.deferredOp.offset); err != nil {
     804           0 :                         return err
     805           0 :                 }
     806             :         }
     807           2 :         return nil
     808             : }
     809             : 
     810             : // Set adds an action to the batch that sets the key to map to the value.
     811             : //
     812             : // It is safe to modify the contents of the arguments after Set returns.
     813           2 : func (b *Batch) Set(key, value []byte, _ *WriteOptions) error {
     814           2 :         deferredOp := b.SetDeferred(len(key), len(value))
     815           2 :         copy(deferredOp.Key, key)
     816           2 :         copy(deferredOp.Value, value)
     817           2 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     818           2 :         // in go1.13 will remove the need for this.
     819           2 :         if b.index != nil {
     820           2 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     821           0 :                         return err
     822           0 :                 }
     823             :         }
     824           2 :         return nil
     825             : }
     826             : 
     827             : // SetDeferred is similar to Set in that it adds a set operation to the batch,
     828             : // except it only takes in key/value lengths instead of complete slices,
     829             : // letting the caller encode into those objects and then call Finish() on the
     830             : // returned object.
     831           2 : func (b *Batch) SetDeferred(keyLen, valueLen int) *DeferredBatchOp {
     832           2 :         b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindSet)
     833           2 :         b.deferredOp.index = b.index
     834           2 :         return &b.deferredOp
     835           2 : }
     836             : 
     837             : // Merge adds an action to the batch that merges the value at key with the new
     838             : // value. The details of the merge are dependent upon the configured merge
     839             : // operator.
     840             : //
     841             : // It is safe to modify the contents of the arguments after Merge returns.
     842           2 : func (b *Batch) Merge(key, value []byte, _ *WriteOptions) error {
     843           2 :         deferredOp := b.MergeDeferred(len(key), len(value))
     844           2 :         copy(deferredOp.Key, key)
     845           2 :         copy(deferredOp.Value, value)
     846           2 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     847           2 :         // in go1.13 will remove the need for this.
     848           2 :         if b.index != nil {
     849           2 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     850           0 :                         return err
     851           0 :                 }
     852             :         }
     853           2 :         return nil
     854             : }
     855             : 
     856             : // MergeDeferred is similar to Merge in that it adds a merge operation to the
     857             : // batch, except it only takes in key/value lengths instead of complete slices,
     858             : // letting the caller encode into those objects and then call Finish() on the
     859             : // returned object.
     860           2 : func (b *Batch) MergeDeferred(keyLen, valueLen int) *DeferredBatchOp {
     861           2 :         b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindMerge)
     862           2 :         b.deferredOp.index = b.index
     863           2 :         return &b.deferredOp
     864           2 : }
     865             : 
     866             : // Delete adds an action to the batch that deletes the entry for key.
     867             : //
     868             : // It is safe to modify the contents of the arguments after Delete returns.
     869           2 : func (b *Batch) Delete(key []byte, _ *WriteOptions) error {
     870           2 :         deferredOp := b.DeleteDeferred(len(key))
     871           2 :         copy(deferredOp.Key, key)
     872           2 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     873           2 :         // in go1.13 will remove the need for this.
     874           2 :         if b.index != nil {
     875           2 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     876           0 :                         return err
     877           0 :                 }
     878             :         }
     879           2 :         return nil
     880             : }
     881             : 
     882             : // DeleteDeferred is similar to Delete in that it adds a delete operation to
     883             : // the batch, except it only takes in key/value lengths instead of complete
     884             : // slices, letting the caller encode into those objects and then call Finish()
     885             : // on the returned object.
     886           2 : func (b *Batch) DeleteDeferred(keyLen int) *DeferredBatchOp {
     887           2 :         b.prepareDeferredKeyRecord(keyLen, InternalKeyKindDelete)
     888           2 :         b.deferredOp.index = b.index
     889           2 :         return &b.deferredOp
     890           2 : }
     891             : 
     892             : // DeleteSized behaves identically to Delete, but takes an additional
     893             : // argument indicating the size of the value being deleted. DeleteSized
     894             : // should be preferred when the caller has the expectation that there exists
     895             : // a single internal KV pair for the key (eg, the key has not been
     896             : // overwritten recently), and the caller knows the size of its value.
     897             : //
     898             : // DeleteSized will record the value size within the tombstone and use it to
     899             : // inform compaction-picking heuristics which strive to reduce space
     900             : // amplification in the LSM. This "calling your shot" mechanic allows the
     901             : // storage engine to more accurately estimate and reduce space amplification.
     902             : //
     903             : // It is safe to modify the contents of the arguments after DeleteSized
     904             : // returns.
     905           2 : func (b *Batch) DeleteSized(key []byte, deletedValueSize uint32, _ *WriteOptions) error {
     906           2 :         deferredOp := b.DeleteSizedDeferred(len(key), deletedValueSize)
     907           2 :         copy(b.deferredOp.Key, key)
     908           2 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Check if in a
     909           2 :         // later Go release this is unnecessary.
     910           2 :         if b.index != nil {
     911           2 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     912           0 :                         return err
     913           0 :                 }
     914             :         }
     915           2 :         return nil
     916             : }
     917             : 
     918             : // DeleteSizedDeferred is similar to DeleteSized in that it adds a sized delete
     919             : // operation to the batch, except it only takes in key length instead of a
     920             : // complete key slice, letting the caller encode into the DeferredBatchOp.Key
     921             : // slice and then call Finish() on the returned object.
     922           2 : func (b *Batch) DeleteSizedDeferred(keyLen int, deletedValueSize uint32) *DeferredBatchOp {
     923           2 :         if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
     924           2 :                 b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
     925           2 :         }
     926             : 
     927             :         // Encode the sum of the key length and the value in the value.
     928           2 :         v := uint64(deletedValueSize) + uint64(keyLen)
     929           2 : 
     930           2 :         // Encode `v` as a varint.
     931           2 :         var buf [binary.MaxVarintLen64]byte
     932           2 :         n := 0
     933           2 :         {
     934           2 :                 x := v
     935           2 :                 for x >= 0x80 {
     936           1 :                         buf[n] = byte(x) | 0x80
     937           1 :                         x >>= 7
     938           1 :                         n++
     939           1 :                 }
     940           2 :                 buf[n] = byte(x)
     941           2 :                 n++
     942             :         }
     943             : 
     944             :         // NB: In batch entries and sstable entries, values are stored as
     945             :         // varstrings. Here, the value is itself a simple varint. This results in an
     946             :         // unnecessary double layer of encoding:
     947             :         //     varint(n) varint(deletedValueSize)
     948             :         // The first varint will always be 1-byte, since a varint-encoded uint64
     949             :         // will never exceed 128 bytes. This unnecessary extra byte and wrapping is
     950             :         // preserved to avoid special casing across the database, and in particular
     951             :         // in sstable block decoding which is performance sensitive.
     952           2 :         b.prepareDeferredKeyValueRecord(keyLen, n, InternalKeyKindDeleteSized)
     953           2 :         b.deferredOp.index = b.index
     954           2 :         copy(b.deferredOp.Value, buf[:n])
     955           2 :         return &b.deferredOp
     956             : }
     957             : 
     958             : // SingleDelete adds an action to the batch that single deletes the entry for key.
     959             : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
     960             : //
     961             : // It is safe to modify the contents of the arguments after SingleDelete returns.
     962           2 : func (b *Batch) SingleDelete(key []byte, _ *WriteOptions) error {
     963           2 :         deferredOp := b.SingleDeleteDeferred(len(key))
     964           2 :         copy(deferredOp.Key, key)
     965           2 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     966           2 :         // in go1.13 will remove the need for this.
     967           2 :         if b.index != nil {
     968           2 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     969           0 :                         return err
     970           0 :                 }
     971             :         }
     972           2 :         return nil
     973             : }
     974             : 
     975             : // SingleDeleteDeferred is similar to SingleDelete in that it adds a single delete
     976             : // operation to the batch, except it only takes in key/value lengths instead of
     977             : // complete slices, letting the caller encode into those objects and then call
     978             : // Finish() on the returned object.
     979           2 : func (b *Batch) SingleDeleteDeferred(keyLen int) *DeferredBatchOp {
     980           2 :         b.prepareDeferredKeyRecord(keyLen, InternalKeyKindSingleDelete)
     981           2 :         b.deferredOp.index = b.index
     982           2 :         return &b.deferredOp
     983           2 : }
     984             : 
     985             : // DeleteRange deletes all of the point keys (and values) in the range
     986             : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
     987             : // delete overlapping range keys (eg, keys set via RangeKeySet).
     988             : //
     989             : // It is safe to modify the contents of the arguments after DeleteRange
     990             : // returns.
     991           2 : func (b *Batch) DeleteRange(start, end []byte, _ *WriteOptions) error {
     992           2 :         deferredOp := b.DeleteRangeDeferred(len(start), len(end))
     993           2 :         copy(deferredOp.Key, start)
     994           2 :         copy(deferredOp.Value, end)
     995           2 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     996           2 :         // in go1.13 will remove the need for this.
     997           2 :         if deferredOp.index != nil {
     998           2 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
     999           0 :                         return err
    1000           0 :                 }
    1001             :         }
    1002           2 :         return nil
    1003             : }
    1004             : 
    1005             : // DeleteRangeDeferred is similar to DeleteRange in that it adds a delete range
    1006             : // operation to the batch, except it only takes in key lengths instead of
    1007             : // complete slices, letting the caller encode into those objects and then call
    1008             : // Finish() on the returned object. Note that DeferredBatchOp.Key should be
    1009             : // populated with the start key, and DeferredBatchOp.Value should be populated
    1010             : // with the end key.
    1011           2 : func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp {
    1012           2 :         b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeDelete)
    1013           2 :         b.countRangeDels++
    1014           2 :         if b.index != nil {
    1015           2 :                 b.tombstones = nil
    1016           2 :                 b.tombstonesSeqNum = 0
    1017           2 :                 // Range deletions are rare, so we lazily allocate the index for them.
    1018           2 :                 if b.rangeDelIndex == nil {
    1019           2 :                         b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
    1020           2 :                 }
    1021           2 :                 b.deferredOp.index = b.rangeDelIndex
    1022             :         }
    1023           2 :         return &b.deferredOp
    1024             : }
    1025             : 
    1026             : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
    1027             : // timestamp suffix to value. The suffix is optional. If any portion of the key
    1028             : // range [start, end) is already set by a range key with the same suffix value,
    1029             : // RangeKeySet overrides it.
    1030             : //
    1031             : // It is safe to modify the contents of the arguments after RangeKeySet returns.
    1032           2 : func (b *Batch) RangeKeySet(start, end, suffix, value []byte, _ *WriteOptions) error {
    1033           2 :         if invariants.Enabled && b.db != nil {
    1034           2 :                 // RangeKeySet is only supported on prefix keys.
    1035           2 :                 if b.db.opts.Comparer.Split(start) != len(start) {
    1036           0 :                         panic("RangeKeySet called with suffixed start key")
    1037             :                 }
    1038           2 :                 if b.db.opts.Comparer.Split(end) != len(end) {
    1039           0 :                         panic("RangeKeySet called with suffixed end key")
    1040             :                 }
    1041             :         }
    1042           2 :         suffixValues := [1]rangekey.SuffixValue{{Suffix: suffix, Value: value}}
    1043           2 :         internalValueLen := rangekey.EncodedSetValueLen(end, suffixValues[:])
    1044           2 : 
    1045           2 :         deferredOp := b.rangeKeySetDeferred(len(start), internalValueLen)
    1046           2 :         copy(deferredOp.Key, start)
    1047           2 :         n := rangekey.EncodeSetValue(deferredOp.Value, end, suffixValues[:])
    1048           2 :         if n != internalValueLen {
    1049           0 :                 panic("unexpected internal value length mismatch")
    1050             :         }
    1051             : 
    1052             :         // Manually inline DeferredBatchOp.Finish().
    1053           2 :         if deferredOp.index != nil {
    1054           2 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
    1055           0 :                         return err
    1056           0 :                 }
    1057             :         }
    1058           2 :         return nil
    1059             : }
    1060             : 
    1061           2 : func (b *Batch) rangeKeySetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
    1062           2 :         b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeySet)
    1063           2 :         b.incrementRangeKeysCount()
    1064           2 :         return &b.deferredOp
    1065           2 : }
    1066             : 
    1067           2 : func (b *Batch) incrementRangeKeysCount() {
    1068           2 :         b.countRangeKeys++
    1069           2 :         if b.index != nil {
    1070           2 :                 b.rangeKeys = nil
    1071           2 :                 b.rangeKeysSeqNum = 0
    1072           2 :                 // Range keys are rare, so we lazily allocate the index for them.
    1073           2 :                 if b.rangeKeyIndex == nil {
    1074           2 :                         b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
    1075           2 :                 }
    1076           2 :                 b.deferredOp.index = b.rangeKeyIndex
    1077             :         }
    1078             : }
    1079             : 
    1080             : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
    1081             : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
    1082             : // range key. RangeKeyUnset only removes portions of range keys that fall within
    1083             : // the [start, end) key span, and only range keys with suffixes that exactly
    1084             : // match the unset suffix.
    1085             : //
    1086             : // It is safe to modify the contents of the arguments after RangeKeyUnset
    1087             : // returns.
    1088           2 : func (b *Batch) RangeKeyUnset(start, end, suffix []byte, _ *WriteOptions) error {
    1089           2 :         if invariants.Enabled && b.db != nil {
    1090           2 :                 // RangeKeyUnset is only supported on prefix keys.
    1091           2 :                 if b.db.opts.Comparer.Split(start) != len(start) {
    1092           0 :                         panic("RangeKeyUnset called with suffixed start key")
    1093             :                 }
    1094           2 :                 if b.db.opts.Comparer.Split(end) != len(end) {
    1095           0 :                         panic("RangeKeyUnset called with suffixed end key")
    1096             :                 }
    1097             :         }
    1098           2 :         suffixes := [1][]byte{suffix}
    1099           2 :         internalValueLen := rangekey.EncodedUnsetValueLen(end, suffixes[:])
    1100           2 : 
    1101           2 :         deferredOp := b.rangeKeyUnsetDeferred(len(start), internalValueLen)
    1102           2 :         copy(deferredOp.Key, start)
    1103           2 :         n := rangekey.EncodeUnsetValue(deferredOp.Value, end, suffixes[:])
    1104           2 :         if n != internalValueLen {
    1105           0 :                 panic("unexpected internal value length mismatch")
    1106             :         }
    1107             : 
    1108             :         // Manually inline DeferredBatchOp.Finish()
    1109           2 :         if deferredOp.index != nil {
    1110           2 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
    1111           0 :                         return err
    1112           0 :                 }
    1113             :         }
    1114           2 :         return nil
    1115             : }
    1116             : 
    1117           2 : func (b *Batch) rangeKeyUnsetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
    1118           2 :         b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeyUnset)
    1119           2 :         b.incrementRangeKeysCount()
    1120           2 :         return &b.deferredOp
    1121           2 : }
    1122             : 
    1123             : // RangeKeyDelete deletes all of the range keys in the range [start,end)
    1124             : // (inclusive on start, exclusive on end). It does not delete point keys (for
    1125             : // that use DeleteRange). RangeKeyDelete removes all range keys within the
    1126             : // bounds, including those with or without suffixes.
    1127             : //
    1128             : // It is safe to modify the contents of the arguments after RangeKeyDelete
    1129             : // returns.
    1130           2 : func (b *Batch) RangeKeyDelete(start, end []byte, _ *WriteOptions) error {
    1131           2 :         if invariants.Enabled && b.db != nil {
    1132           2 :                 // RangeKeyDelete is only supported on prefix keys.
    1133           2 :                 if b.db.opts.Comparer.Split(start) != len(start) {
    1134           0 :                         panic("RangeKeyDelete called with suffixed start key")
    1135             :                 }
    1136           2 :                 if b.db.opts.Comparer.Split(end) != len(end) {
    1137           0 :                         panic("RangeKeyDelete called with suffixed end key")
    1138             :                 }
    1139             :         }
    1140           2 :         deferredOp := b.RangeKeyDeleteDeferred(len(start), len(end))
    1141           2 :         copy(deferredOp.Key, start)
    1142           2 :         copy(deferredOp.Value, end)
    1143           2 :         // Manually inline DeferredBatchOp.Finish().
    1144           2 :         if deferredOp.index != nil {
    1145           2 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
    1146           0 :                         return err
    1147           0 :                 }
    1148             :         }
    1149           2 :         return nil
    1150             : }
    1151             : 
    1152             : // RangeKeyDeleteDeferred is similar to RangeKeyDelete in that it adds an
    1153             : // operation to delete range keys to the batch, except it only takes in key
    1154             : // lengths instead of complete slices, letting the caller encode into those
    1155             : // objects and then call Finish() on the returned object. Note that
    1156             : // DeferredBatchOp.Key should be populated with the start key, and
    1157             : // DeferredBatchOp.Value should be populated with the end key.
    1158           2 : func (b *Batch) RangeKeyDeleteDeferred(startLen, endLen int) *DeferredBatchOp {
    1159           2 :         b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeKeyDelete)
    1160           2 :         b.incrementRangeKeysCount()
    1161           2 :         return &b.deferredOp
    1162           2 : }
    1163             : 
    1164             : // LogData adds the specified to the batch. The data will be written to the
    1165             : // WAL, but not added to memtables or sstables. Log data is never indexed,
    1166             : // which makes it useful for testing WAL performance.
    1167             : //
    1168             : // It is safe to modify the contents of the argument after LogData returns.
    1169           2 : func (b *Batch) LogData(data []byte, _ *WriteOptions) error {
    1170           2 :         origCount, origMemTableSize := b.count, b.memTableSize
    1171           2 :         b.prepareDeferredKeyRecord(len(data), InternalKeyKindLogData)
    1172           2 :         copy(b.deferredOp.Key, data)
    1173           2 :         // Since LogData only writes to the WAL and does not affect the memtable, we
    1174           2 :         // restore b.count and b.memTableSize to their origin values. Note that
    1175           2 :         // Batch.count only refers to records that are added to the memtable.
    1176           2 :         b.count, b.memTableSize = origCount, origMemTableSize
    1177           2 :         return nil
    1178           2 : }
    1179             : 
    1180             : // IngestSST adds the FileNum for an sstable to the batch. The data will only be
    1181             : // written to the WAL (not added to memtables or sstables).
    1182           2 : func (b *Batch) ingestSST(fileNum base.FileNum) {
    1183           2 :         if b.Empty() {
    1184           2 :                 b.ingestedSSTBatch = true
    1185           2 :         } else if !b.ingestedSSTBatch {
    1186           0 :                 // Batch contains other key kinds.
    1187           0 :                 panic("pebble: invalid call to ingestSST")
    1188             :         }
    1189             : 
    1190           2 :         origMemTableSize := b.memTableSize
    1191           2 :         var buf [binary.MaxVarintLen64]byte
    1192           2 :         length := binary.PutUvarint(buf[:], uint64(fileNum))
    1193           2 :         b.prepareDeferredKeyRecord(length, InternalKeyKindIngestSST)
    1194           2 :         copy(b.deferredOp.Key, buf[:length])
    1195           2 :         // Since IngestSST writes only to the WAL and does not affect the memtable,
    1196           2 :         // we restore b.memTableSize to its original value. Note that Batch.count
    1197           2 :         // is not reset because for the InternalKeyKindIngestSST the count is the
    1198           2 :         // number of sstable paths which have been added to the batch.
    1199           2 :         b.memTableSize = origMemTableSize
    1200           2 :         b.minimumFormatMajorVersion = FormatFlushableIngest
    1201             : }
    1202             : 
    1203             : // Excise adds the excise span for a flushable ingest containing an excise. The data
    1204             : // will only be written to the WAL (not added to memtables or sstables).
    1205           2 : func (b *Batch) excise(start, end []byte) {
    1206           2 :         if b.Empty() {
    1207           2 :                 b.ingestedSSTBatch = true
    1208           2 :         } else if !b.ingestedSSTBatch {
    1209           0 :                 // Batch contains other key kinds.
    1210           0 :                 panic("pebble: invalid call to excise")
    1211             :         }
    1212             : 
    1213           2 :         origMemTableSize := b.memTableSize
    1214           2 :         b.prepareDeferredKeyValueRecord(len(start), len(end), InternalKeyKindExcise)
    1215           2 :         copy(b.deferredOp.Key, start)
    1216           2 :         copy(b.deferredOp.Value, end)
    1217           2 :         // Since excise writes only to the WAL and does not affect the memtable,
    1218           2 :         // we restore b.memTableSize to its original value. Note that Batch.count
    1219           2 :         // is not reset because for the InternalKeyKindIngestSST/Excise the count
    1220           2 :         // is the number of sstable paths which have been added to the batch.
    1221           2 :         b.memTableSize = origMemTableSize
    1222           2 :         b.minimumFormatMajorVersion = FormatFlushableIngestExcises
    1223             : }
    1224             : 
    1225             : // Empty returns true if the batch is empty, and false otherwise.
    1226           2 : func (b *Batch) Empty() bool {
    1227           2 :         return batchrepr.IsEmpty(b.data)
    1228           2 : }
    1229             : 
    1230             : // Len returns the current size of the batch in bytes.
    1231           1 : func (b *Batch) Len() int {
    1232           1 :         return max(batchrepr.HeaderLen, len(b.data))
    1233           1 : }
    1234             : 
    1235             : // Repr returns the underlying batch representation. It is not safe to modify
    1236             : // the contents. Reset() will not change the contents of the returned value,
    1237             : // though any other mutation operation may do so.
    1238           2 : func (b *Batch) Repr() []byte {
    1239           2 :         if len(b.data) == 0 {
    1240           1 :                 b.init(batchrepr.HeaderLen)
    1241           1 :         }
    1242           2 :         batchrepr.SetCount(b.data, b.Count())
    1243           2 :         return b.data
    1244             : }
    1245             : 
    1246             : // SetRepr sets the underlying batch representation. The batch takes ownership
    1247             : // of the supplied slice. It is not safe to modify it afterwards until the
    1248             : // Batch is no longer in use.
    1249             : //
    1250             : // SetRepr may return ErrInvalidBatch if the supplied slice fails to decode in
    1251             : // any way. It will not return an error in any other circumstance.
    1252           2 : func (b *Batch) SetRepr(data []byte) error {
    1253           2 :         h, ok := batchrepr.ReadHeader(data)
    1254           2 :         if !ok {
    1255           0 :                 return ErrInvalidBatch
    1256           0 :         }
    1257           2 :         b.data = data
    1258           2 :         b.count = uint64(h.Count)
    1259           2 :         var err error
    1260           2 :         if b.db != nil {
    1261           2 :                 // Only track memTableSize for batches that will be committed to the DB.
    1262           2 :                 err = b.refreshMemTableSize()
    1263           2 :         }
    1264           2 :         return err
    1265             : }
    1266             : 
    1267             : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
    1268             : // return false). The iterator can be positioned via a call to SeekGE,
    1269             : // SeekPrefixGE, SeekLT, First or Last. Only indexed batches support iterators.
    1270             : //
    1271             : // The returned Iterator observes all of the Batch's existing mutations, but no
    1272             : // later mutations. Its view can be refreshed via RefreshBatchSnapshot or
    1273             : // SetOptions().
    1274           2 : func (b *Batch) NewIter(o *IterOptions) (*Iterator, error) {
    1275           2 :         return b.NewIterWithContext(context.Background(), o)
    1276           2 : }
    1277             : 
    1278             : // NewIterWithContext is like NewIter, and additionally accepts a context for
    1279             : // tracing.
    1280           2 : func (b *Batch) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
    1281           2 :         if b.index == nil {
    1282           0 :                 return nil, ErrNotIndexed
    1283           0 :         }
    1284           2 :         return b.db.newIter(ctx, b, newIterOpts{}, o), nil
    1285             : }
    1286             : 
    1287             : // NewBatchOnlyIter constructs an iterator that only reads the contents of the
    1288             : // batch, and does not overlay the batch mutations on top of the DB state.
    1289             : //
    1290             : // The returned Iterator observes all of the Batch's existing mutations, but
    1291             : // no later mutations. Its view can be refreshed via RefreshBatchSnapshot or
    1292             : // SetOptions().
    1293           1 : func (b *Batch) NewBatchOnlyIter(ctx context.Context, o *IterOptions) (*Iterator, error) {
    1294           1 :         if b.index == nil {
    1295           0 :                 return nil, ErrNotIndexed
    1296           0 :         }
    1297           1 :         return b.db.newIter(ctx, b, newIterOpts{batch: batchIterOpts{batchOnly: true}}, o), nil
    1298             : }
    1299             : 
    1300             : // newInternalIter creates a new internalIterator that iterates over the
    1301             : // contents of the batch.
    1302           2 : func (b *Batch) newInternalIter(o *IterOptions) *batchIter {
    1303           2 :         iter := &batchIter{}
    1304           2 :         b.initInternalIter(o, iter)
    1305           2 :         return iter
    1306           2 : }
    1307             : 
    1308           2 : func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) {
    1309           2 :         *iter = batchIter{
    1310           2 :                 batch: b,
    1311           2 :                 iter:  b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()),
    1312           2 :                 // NB: We explicitly do not propagate the batch snapshot to the point
    1313           2 :                 // key iterator. Filtering point keys within the batch iterator can
    1314           2 :                 // cause pathological behavior where a batch iterator advances
    1315           2 :                 // significantly farther than necessary filtering many batch keys that
    1316           2 :                 // are not visible at the batch sequence number. Instead, the merging
    1317           2 :                 // iterator enforces bounds.
    1318           2 :                 //
    1319           2 :                 // For example, consider an engine that contains the committed keys
    1320           2 :                 // 'bar' and 'bax', with no keys between them. Consider a batch
    1321           2 :                 // containing keys 1,000 keys within the range [a,z]. All of the
    1322           2 :                 // batch keys were added to the batch after the iterator was
    1323           2 :                 // constructed, so they are not visible to the iterator. A call to
    1324           2 :                 // SeekGE('bax') would seek the LSM iterators and discover the key
    1325           2 :                 // 'bax'. It would also seek the batch iterator, landing on the key
    1326           2 :                 // 'baz' but discover it that it's not visible. The batch iterator would
    1327           2 :                 // next through the rest of the batch's keys, only to discover there are
    1328           2 :                 // no visible keys greater than or equal to 'bax'.
    1329           2 :                 //
    1330           2 :                 // Filtering these batch points within the merging iterator ensures that
    1331           2 :                 // the batch iterator never needs to iterate beyond 'baz', because it
    1332           2 :                 // already found a smaller, visible key 'bax'.
    1333           2 :                 snapshot: base.SeqNumMax,
    1334           2 :         }
    1335           2 : }
    1336             : 
    1337           2 : func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
    1338           2 :         // Construct an iterator even if rangeDelIndex is nil, because it is allowed
    1339           2 :         // to refresh later, so we need the container to exist.
    1340           2 :         iter := new(keyspan.Iter)
    1341           2 :         b.initRangeDelIter(o, iter, batchSnapshot)
    1342           2 :         return iter
    1343           2 : }
    1344             : 
    1345           2 : func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
    1346           2 :         if b.rangeDelIndex == nil {
    1347           2 :                 iter.Init(b.comparer.Compare, nil)
    1348           2 :                 return
    1349           2 :         }
    1350             : 
    1351             :         // Fragment the range tombstones the first time a range deletion iterator is
    1352             :         // requested. The cached tombstones are invalidated if another range
    1353             :         // deletion tombstone is added to the batch. This cache is only guaranteed
    1354             :         // to be correct if we're opening an iterator to read at a batch sequence
    1355             :         // number at least as high as tombstonesSeqNum. The cache is guaranteed to
    1356             :         // include all tombstones up to tombstonesSeqNum, and if any additional
    1357             :         // tombstones were added after that sequence number the cache would've been
    1358             :         // cleared.
    1359           2 :         nextSeqNum := b.nextSeqNum()
    1360           2 :         if b.tombstones != nil && b.tombstonesSeqNum <= batchSnapshot {
    1361           1 :                 iter.Init(b.comparer.Compare, b.tombstones)
    1362           1 :                 return
    1363           1 :         }
    1364             : 
    1365           2 :         tombstones := make([]keyspan.Span, 0, b.countRangeDels)
    1366           2 :         frag := &keyspan.Fragmenter{
    1367           2 :                 Cmp:    b.comparer.Compare,
    1368           2 :                 Format: b.comparer.FormatKey,
    1369           2 :                 Emit: func(s keyspan.Span) {
    1370           2 :                         tombstones = append(tombstones, s)
    1371           2 :                 },
    1372             :         }
    1373           2 :         it := &batchIter{
    1374           2 :                 batch:    b,
    1375           2 :                 iter:     b.rangeDelIndex.NewIter(nil, nil),
    1376           2 :                 snapshot: batchSnapshot,
    1377           2 :         }
    1378           2 :         fragmentRangeDels(frag, it, int(b.countRangeDels))
    1379           2 :         iter.Init(b.comparer.Compare, tombstones)
    1380           2 : 
    1381           2 :         // If we just read all the tombstones in the batch (eg, batchSnapshot was
    1382           2 :         // set to b.nextSeqNum()), then cache the tombstones so that a subsequent
    1383           2 :         // call to initRangeDelIter may use them without refragmenting.
    1384           2 :         if nextSeqNum == batchSnapshot {
    1385           2 :                 b.tombstones = tombstones
    1386           2 :                 b.tombstonesSeqNum = nextSeqNum
    1387           2 :         }
    1388             : }
    1389             : 
    1390           2 : func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int) {
    1391           2 :         // The memory management here is a bit subtle. The keys and values returned
    1392           2 :         // by the iterator are slices in Batch.data. Thus the fragmented tombstones
    1393           2 :         // are slices within Batch.data. If additional entries are added to the
    1394           2 :         // Batch, Batch.data may be reallocated. The references in the fragmented
    1395           2 :         // tombstones will remain valid, pointing into the old Batch.data. GC for
    1396           2 :         // the win.
    1397           2 : 
    1398           2 :         // Use a single []keyspan.Key buffer to avoid allocating many
    1399           2 :         // individual []keyspan.Key slices with a single element each.
    1400           2 :         keyBuf := make([]keyspan.Key, 0, count)
    1401           2 :         for kv := it.First(); kv != nil; kv = it.Next() {
    1402           2 :                 s := rangedel.Decode(kv.K, kv.InPlaceValue(), keyBuf)
    1403           2 :                 keyBuf = s.Keys[len(s.Keys):]
    1404           2 : 
    1405           2 :                 // Set a fixed capacity to avoid accidental overwriting.
    1406           2 :                 s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
    1407           2 :                 frag.Add(s)
    1408           2 :         }
    1409           2 :         frag.Finish()
    1410             : }
    1411             : 
    1412           2 : func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
    1413           2 :         // Construct an iterator even if rangeKeyIndex is nil, because it is allowed
    1414           2 :         // to refresh later, so we need the container to exist.
    1415           2 :         iter := new(keyspan.Iter)
    1416           2 :         b.initRangeKeyIter(o, iter, batchSnapshot)
    1417           2 :         return iter
    1418           2 : }
    1419             : 
    1420           2 : func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
    1421           2 :         if b.rangeKeyIndex == nil {
    1422           2 :                 iter.Init(b.comparer.Compare, nil)
    1423           2 :                 return
    1424           2 :         }
    1425             : 
    1426             :         // Fragment the range keys the first time a range key iterator is requested.
    1427             :         // The cached spans are invalidated if another range key is added to the
    1428             :         // batch. This cache is only guaranteed to be correct if we're opening an
    1429             :         // iterator to read at a batch sequence number at least as high as
    1430             :         // rangeKeysSeqNum. The cache is guaranteed to include all range keys up to
    1431             :         // rangeKeysSeqNum, and if any additional range keys were added after that
    1432             :         // sequence number the cache would've been cleared.
    1433           2 :         nextSeqNum := b.nextSeqNum()
    1434           2 :         if b.rangeKeys != nil && b.rangeKeysSeqNum <= batchSnapshot {
    1435           2 :                 iter.Init(b.comparer.Compare, b.rangeKeys)
    1436           2 :                 return
    1437           2 :         }
    1438             : 
    1439           2 :         rangeKeys := make([]keyspan.Span, 0, b.countRangeKeys)
    1440           2 :         frag := &keyspan.Fragmenter{
    1441           2 :                 Cmp:    b.comparer.Compare,
    1442           2 :                 Format: b.comparer.FormatKey,
    1443           2 :                 Emit: func(s keyspan.Span) {
    1444           2 :                         rangeKeys = append(rangeKeys, s)
    1445           2 :                 },
    1446             :         }
    1447           2 :         it := &batchIter{
    1448           2 :                 batch:    b,
    1449           2 :                 iter:     b.rangeKeyIndex.NewIter(nil, nil),
    1450           2 :                 snapshot: batchSnapshot,
    1451           2 :         }
    1452           2 :         fragmentRangeKeys(frag, it, int(b.countRangeKeys))
    1453           2 :         iter.Init(b.comparer.Compare, rangeKeys)
    1454           2 : 
    1455           2 :         // If we just read all the range keys in the batch (eg, batchSnapshot was
    1456           2 :         // set to b.nextSeqNum()), then cache the range keys so that a subsequent
    1457           2 :         // call to initRangeKeyIter may use them without refragmenting.
    1458           2 :         if nextSeqNum == batchSnapshot {
    1459           2 :                 b.rangeKeys = rangeKeys
    1460           2 :                 b.rangeKeysSeqNum = nextSeqNum
    1461           2 :         }
    1462             : }
    1463             : 
    1464           2 : func fragmentRangeKeys(frag *keyspan.Fragmenter, it internalIterator, count int) error {
    1465           2 :         // The memory management here is a bit subtle. The keys and values
    1466           2 :         // returned by the iterator are slices in Batch.data. Thus the
    1467           2 :         // fragmented key spans are slices within Batch.data. If additional
    1468           2 :         // entries are added to the Batch, Batch.data may be reallocated. The
    1469           2 :         // references in the fragmented keys will remain valid, pointing into
    1470           2 :         // the old Batch.data. GC for the win.
    1471           2 : 
    1472           2 :         // Use a single []keyspan.Key buffer to avoid allocating many
    1473           2 :         // individual []keyspan.Key slices with a single element each.
    1474           2 :         keyBuf := make([]keyspan.Key, 0, count)
    1475           2 :         for kv := it.First(); kv != nil; kv = it.Next() {
    1476           2 :                 s, err := rangekey.Decode(kv.K, kv.InPlaceValue(), keyBuf)
    1477           2 :                 if err != nil {
    1478           0 :                         return err
    1479           0 :                 }
    1480           2 :                 keyBuf = s.Keys[len(s.Keys):]
    1481           2 : 
    1482           2 :                 // Set a fixed capacity to avoid accidental overwriting.
    1483           2 :                 s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
    1484           2 :                 frag.Add(s)
    1485             :         }
    1486           2 :         frag.Finish()
    1487           2 :         return nil
    1488             : }
    1489             : 
    1490             : // Commit applies the batch to its parent writer.
    1491           2 : func (b *Batch) Commit(o *WriteOptions) error {
    1492           2 :         return b.db.Apply(b, o)
    1493           2 : }
    1494             : 
    1495             : // Close closes the batch without committing it.
    1496           2 : func (b *Batch) Close() error {
    1497           2 :         // The storage engine commit pipeline may retain a pointer to b.data beyond
    1498           2 :         // when Commit() returns. This is possible when configured for WAL failover;
    1499           2 :         // we don't know if we might need to read the batch data again until the
    1500           2 :         // batch has been durably synced [even if the committer doesn't care to wait
    1501           2 :         // for the sync and Sync()=false].
    1502           2 :         //
    1503           2 :         // We still want to recycle these batches. The b.lifecycle atomic negotiates
    1504           2 :         // the batch's lifecycle. If the commit pipeline still might read b.data,
    1505           2 :         // b.lifecycle will be nonzeroed [the low bits hold a ref count].
    1506           2 :         for {
    1507           2 :                 v := b.lifecycle.Load()
    1508           2 :                 switch {
    1509           2 :                 case v == 0:
    1510           2 :                         // A zero value indicates that the commit pipeline has no
    1511           2 :                         // outstanding references to the batch. The commit pipeline is
    1512           2 :                         // required to acquire a ref synchronously, so there is no risk that
    1513           2 :                         // the commit pipeline will grab a ref after the call to release. We
    1514           2 :                         // can simply release the batch.
    1515           2 :                         b.release()
    1516           2 :                         return nil
    1517           1 :                 case (v & batchClosedBit) != 0:
    1518           1 :                         // The batch has a batchClosedBit: This batch has already been closed.
    1519           1 :                         return ErrClosed
    1520           2 :                 default:
    1521           2 :                         // There's an outstanding reference. Set the batch released bit so
    1522           2 :                         // that the commit pipeline knows it should release the batch when
    1523           2 :                         // it unrefs.
    1524           2 :                         if b.lifecycle.CompareAndSwap(v, v|batchClosedBit) {
    1525           2 :                                 return nil
    1526           2 :                         }
    1527             :                         // CAS Failed—this indicates the outstanding reference just
    1528             :                         // decremented (or the caller illegally closed the batch twice).
    1529             :                         // Loop to reload.
    1530             :                 }
    1531             :         }
    1532             : }
    1533             : 
    1534             : // Indexed returns true if the batch is indexed (i.e. supports read
    1535             : // operations).
    1536           2 : func (b *Batch) Indexed() bool {
    1537           2 :         return b.index != nil
    1538           2 : }
    1539             : 
    1540             : // init ensures that the batch data slice is initialized to meet the
    1541             : // minimum required size and allocates space for the batch header.
    1542           2 : func (b *Batch) init(size int) {
    1543           2 :         b.opts.ensureDefaults()
    1544           2 :         n := b.opts.initialSizeBytes
    1545           2 :         for n < size {
    1546           1 :                 n *= 2
    1547           1 :         }
    1548           2 :         if cap(b.data) < n {
    1549           2 :                 b.data = rawalloc.New(batchrepr.HeaderLen, n)
    1550           2 :         }
    1551           2 :         b.data = b.data[:batchrepr.HeaderLen]
    1552           2 :         clear(b.data) // Zero the sequence number in the header
    1553             : }
    1554             : 
    1555             : // Reset resets the batch for reuse. The underlying byte slice (that is
    1556             : // returned by Repr()) may not be modified. It is only necessary to call this
    1557             : // method if a batch is explicitly being reused. Close automatically takes are
    1558             : // of releasing resources when appropriate for batches that are internally
    1559             : // being reused.
    1560           1 : func (b *Batch) Reset() {
    1561           1 :         // In some configurations (WAL failover) the commit pipeline may retain
    1562           1 :         // b.data beyond a call to commit the batch. When this happens, b.lifecycle
    1563           1 :         // is nonzero (see the comment above b.lifecycle). In this case it's unsafe
    1564           1 :         // to mutate b.data, so we discard it. Note that Reset must not be called on
    1565           1 :         // a closed batch, so v > 0 implies a non-zero ref count and not
    1566           1 :         // batchClosedBit being set.
    1567           1 :         if v := b.lifecycle.Load(); v > 0 {
    1568           1 :                 b.data = nil
    1569           1 :         }
    1570           1 :         b.reset()
    1571             : }
    1572             : 
    1573           2 : func (b *Batch) reset() {
    1574           2 :         // Zero out the struct, retaining only the fields necessary for manual
    1575           2 :         // reuse.
    1576           2 :         b.batchInternal = batchInternal{
    1577           2 :                 data:     b.data,
    1578           2 :                 comparer: b.comparer,
    1579           2 :                 opts:     b.opts,
    1580           2 :                 index:    b.index,
    1581           2 :                 db:       b.db,
    1582           2 :         }
    1583           2 :         b.applied.Store(false)
    1584           2 :         if b.data != nil {
    1585           2 :                 if cap(b.data) > b.opts.maxRetainedSizeBytes {
    1586           1 :                         // If the capacity of the buffer is larger than our maximum
    1587           1 :                         // retention size, don't re-use it. Let it be GC-ed instead.
    1588           1 :                         // This prevents the memory from an unusually large batch from
    1589           1 :                         // being held on to indefinitely.
    1590           1 :                         b.data = nil
    1591           2 :                 } else {
    1592           2 :                         // Otherwise, reset the buffer for re-use.
    1593           2 :                         b.data = b.data[:batchrepr.HeaderLen]
    1594           2 :                         clear(b.data)
    1595           2 :                 }
    1596             :         }
    1597           2 :         if b.index != nil {
    1598           2 :                 b.index.Init(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
    1599           2 :         }
    1600             : }
    1601             : 
    1602           2 : func (b *Batch) grow(n int) {
    1603           2 :         newSize := len(b.data) + n
    1604           2 :         if uint64(newSize) >= maxBatchSize {
    1605           1 :                 panic(ErrBatchTooLarge)
    1606             :         }
    1607           2 :         if newSize > cap(b.data) {
    1608           1 :                 newCap := 2 * cap(b.data)
    1609           1 :                 for newCap < newSize {
    1610           1 :                         newCap *= 2
    1611           1 :                 }
    1612           1 :                 newData := rawalloc.New(len(b.data), newCap)
    1613           1 :                 copy(newData, b.data)
    1614           1 :                 b.data = newData
    1615             :         }
    1616           2 :         b.data = b.data[:newSize]
    1617             : }
    1618             : 
    1619           2 : func (b *Batch) setSeqNum(seqNum base.SeqNum) {
    1620           2 :         batchrepr.SetSeqNum(b.data, seqNum)
    1621           2 : }
    1622             : 
    1623             : // SeqNum returns the batch sequence number which is applied to the first
    1624             : // record in the batch. The sequence number is incremented for each subsequent
    1625             : // record. It returns zero if the batch is empty.
    1626           2 : func (b *Batch) SeqNum() base.SeqNum {
    1627           2 :         if len(b.data) == 0 {
    1628           1 :                 b.init(batchrepr.HeaderLen)
    1629           1 :         }
    1630           2 :         return batchrepr.ReadSeqNum(b.data)
    1631             : }
    1632             : 
    1633           2 : func (b *Batch) setCount(v uint32) {
    1634           2 :         b.count = uint64(v)
    1635           2 : }
    1636             : 
    1637             : // Count returns the count of memtable-modifying operations in this batch. All
    1638             : // operations with the except of LogData increment this count. For IngestSSTs,
    1639             : // count is only used to indicate the number of SSTs ingested in the record, the
    1640             : // batch isn't applied to the memtable.
    1641           2 : func (b *Batch) Count() uint32 {
    1642           2 :         if b.count > math.MaxUint32 {
    1643           1 :                 panic(batchrepr.ErrInvalidBatch)
    1644             :         }
    1645           2 :         return uint32(b.count)
    1646             : }
    1647             : 
    1648             : // Reader returns a batchrepr.Reader for the current batch contents. If the
    1649             : // batch is mutated, the new entries will not be visible to the reader.
    1650           2 : func (b *Batch) Reader() batchrepr.Reader {
    1651           2 :         if len(b.data) == 0 {
    1652           2 :                 b.init(batchrepr.HeaderLen)
    1653           2 :         }
    1654           2 :         return batchrepr.Read(b.data)
    1655             : }
    1656             : 
    1657             : // SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
    1658           2 : func (b *Batch) SyncWait() error {
    1659           2 :         now := crtime.NowMono()
    1660           2 :         b.fsyncWait.Wait()
    1661           2 :         if b.commitErr != nil {
    1662           0 :                 b.db = nil // prevent batch reuse on error
    1663           0 :         }
    1664           2 :         waitDuration := now.Elapsed()
    1665           2 :         b.commitStats.CommitWaitDuration += waitDuration
    1666           2 :         b.commitStats.TotalDuration += waitDuration
    1667           2 :         return b.commitErr
    1668             : }
    1669             : 
    1670             : // CommitStats returns stats related to committing the batch. Should be called
    1671             : // after Batch.Commit, DB.Apply. If DB.ApplyNoSyncWait is used, should be
    1672             : // called after Batch.SyncWait.
    1673           1 : func (b *Batch) CommitStats() BatchCommitStats {
    1674           1 :         return b.commitStats
    1675           1 : }
    1676             : 
    1677             : // Note: batchIter mirrors the implementation of flushableBatchIter. Keep the
    1678             : // two in sync.
    1679             : type batchIter struct {
    1680             :         batch *Batch
    1681             :         iter  batchskl.Iterator
    1682             :         kv    base.InternalKV
    1683             :         err   error
    1684             :         // snapshot holds a batch "sequence number" at which the batch is being
    1685             :         // read. This sequence number has the InternalKeySeqNumBatch bit set, so it
    1686             :         // encodes an offset within the batch. Only batch entries earlier than the
    1687             :         // offset are visible during iteration.
    1688             :         snapshot base.SeqNum
    1689             : }
    1690             : 
    1691             : // batchIter implements the base.InternalIterator interface.
    1692             : var _ base.InternalIterator = (*batchIter)(nil)
    1693             : 
    1694           0 : func (i *batchIter) String() string {
    1695           0 :         return "batch"
    1696           0 : }
    1697             : 
    1698           2 : func (i *batchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
    1699           2 :         // Ignore TrySeekUsingNext if the view of the batch changed.
    1700           2 :         if flags.TrySeekUsingNext() && flags.BatchJustRefreshed() {
    1701           1 :                 flags = flags.DisableTrySeekUsingNext()
    1702           1 :         }
    1703             : 
    1704           2 :         i.err = nil // clear cached iteration error
    1705           2 :         ikey := i.iter.SeekGE(key, flags)
    1706           2 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1707           0 :                 ikey = i.iter.Next()
    1708           0 :         }
    1709           2 :         if ikey == nil {
    1710           2 :                 i.kv = base.InternalKV{}
    1711           2 :                 return nil
    1712           2 :         }
    1713           2 :         i.kv.K = *ikey
    1714           2 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1715           2 :         return &i.kv
    1716             : }
    1717             : 
    1718           2 : func (i *batchIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
    1719           2 :         kv := i.SeekGE(key, flags)
    1720           2 :         if kv == nil {
    1721           2 :                 return nil
    1722           2 :         }
    1723             :         // If the key doesn't have the sought prefix, return nil.
    1724           2 :         if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
    1725           2 :                 i.kv = base.InternalKV{}
    1726           2 :                 return nil
    1727           2 :         }
    1728           2 :         return kv
    1729             : }
    1730             : 
    1731           2 : func (i *batchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
    1732           2 :         i.err = nil // clear cached iteration error
    1733           2 :         ikey := i.iter.SeekLT(key)
    1734           2 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1735           0 :                 ikey = i.iter.Prev()
    1736           0 :         }
    1737           2 :         if ikey == nil {
    1738           2 :                 i.kv = base.InternalKV{}
    1739           2 :                 return nil
    1740           2 :         }
    1741           2 :         i.kv.K = *ikey
    1742           2 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1743           2 :         return &i.kv
    1744             : }
    1745             : 
    1746           2 : func (i *batchIter) First() *base.InternalKV {
    1747           2 :         i.err = nil // clear cached iteration error
    1748           2 :         ikey := i.iter.First()
    1749           2 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1750           1 :                 ikey = i.iter.Next()
    1751           1 :         }
    1752           2 :         if ikey == nil {
    1753           2 :                 i.kv = base.InternalKV{}
    1754           2 :                 return nil
    1755           2 :         }
    1756           2 :         i.kv.K = *ikey
    1757           2 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1758           2 :         return &i.kv
    1759             : }
    1760             : 
    1761           2 : func (i *batchIter) Last() *base.InternalKV {
    1762           2 :         i.err = nil // clear cached iteration error
    1763           2 :         ikey := i.iter.Last()
    1764           2 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1765           0 :                 ikey = i.iter.Prev()
    1766           0 :         }
    1767           2 :         if ikey == nil {
    1768           1 :                 i.kv = base.InternalKV{}
    1769           1 :                 return nil
    1770           1 :         }
    1771           1 :         i.kv.K = *ikey
    1772           1 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1773           1 :         return &i.kv
    1774             : }
    1775             : 
    1776           2 : func (i *batchIter) Next() *base.InternalKV {
    1777           2 :         ikey := i.iter.Next()
    1778           2 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1779           1 :                 ikey = i.iter.Next()
    1780           1 :         }
    1781           2 :         if ikey == nil {
    1782           2 :                 i.kv = base.InternalKV{}
    1783           2 :                 return nil
    1784           2 :         }
    1785           2 :         i.kv.K = *ikey
    1786           2 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1787           2 :         return &i.kv
    1788             : }
    1789             : 
    1790           0 : func (i *batchIter) NextPrefix(succKey []byte) *base.InternalKV {
    1791           0 :         // Because NextPrefix was invoked `succKey` must be ≥ the key at i's current
    1792           0 :         // position. Seek the arena iterator using TrySeekUsingNext.
    1793           0 :         ikey := i.iter.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
    1794           0 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1795           0 :                 ikey = i.iter.Next()
    1796           0 :         }
    1797           0 :         if ikey == nil {
    1798           0 :                 i.kv = base.InternalKV{}
    1799           0 :                 return nil
    1800           0 :         }
    1801           0 :         i.kv.K = *ikey
    1802           0 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1803           0 :         return &i.kv
    1804             : }
    1805             : 
    1806           2 : func (i *batchIter) Prev() *base.InternalKV {
    1807           2 :         ikey := i.iter.Prev()
    1808           2 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1809           0 :                 ikey = i.iter.Prev()
    1810           0 :         }
    1811           2 :         if ikey == nil {
    1812           2 :                 i.kv = base.InternalKV{}
    1813           2 :                 return nil
    1814           2 :         }
    1815           2 :         i.kv.K = *ikey
    1816           2 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1817           2 :         return &i.kv
    1818             : }
    1819             : 
    1820           2 : func (i *batchIter) value() []byte {
    1821           2 :         offset, _, keyEnd := i.iter.KeyInfo()
    1822           2 :         data := i.batch.data
    1823           2 :         if len(data[offset:]) == 0 {
    1824           0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    1825           0 :                 return nil
    1826           0 :         }
    1827             : 
    1828           2 :         switch InternalKeyKind(data[offset]) {
    1829             :         case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
    1830             :                 InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
    1831           2 :                 InternalKeyKindDeleteSized:
    1832           2 :                 _, value, ok := batchrepr.DecodeStr(data[keyEnd:])
    1833           2 :                 if !ok {
    1834           0 :                         return nil
    1835           0 :                 }
    1836           2 :                 return value
    1837           2 :         default:
    1838           2 :                 return nil
    1839             :         }
    1840             : }
    1841             : 
    1842           2 : func (i *batchIter) Error() error {
    1843           2 :         return i.err
    1844           2 : }
    1845             : 
    1846           2 : func (i *batchIter) Close() error {
    1847           2 :         _ = i.iter.Close()
    1848           2 :         return i.err
    1849           2 : }
    1850             : 
    1851           2 : func (i *batchIter) SetBounds(lower, upper []byte) {
    1852           2 :         i.iter.SetBounds(lower, upper)
    1853           2 : }
    1854             : 
    1855           0 : func (i *batchIter) SetContext(_ context.Context) {}
    1856             : 
    1857             : // DebugTree is part of the InternalIterator interface.
    1858           0 : func (i *batchIter) DebugTree(tp treeprinter.Node) {
    1859           0 :         tp.Childf("%T(%p)", i, i)
    1860           0 : }
    1861             : 
    1862             : type flushableBatchEntry struct {
    1863             :         // offset is the byte offset of the record within the batch repr.
    1864             :         offset uint32
    1865             :         // index is the 0-based ordinal number of the record within the batch. Used
    1866             :         // to compute the seqnum for the record.
    1867             :         index uint32
    1868             :         // key{Start,End} are the start and end byte offsets of the key within the
    1869             :         // batch repr. Cached to avoid decoding the key length on every
    1870             :         // comparison. The value is stored starting at keyEnd.
    1871             :         keyStart uint32
    1872             :         keyEnd   uint32
    1873             : }
    1874             : 
    1875             : // flushableBatch wraps an existing batch and provides the interfaces needed
    1876             : // for making the batch flushable (i.e. able to mimic a memtable).
    1877             : type flushableBatch struct {
    1878             :         cmp      Compare
    1879             :         comparer *base.Comparer
    1880             :         data     []byte
    1881             : 
    1882             :         // The base sequence number for the entries in the batch. This is the same
    1883             :         // value as Batch.seqNum() and is cached here for performance.
    1884             :         seqNum base.SeqNum
    1885             : 
    1886             :         // A slice of offsets and indices for the entries in the batch. Used to
    1887             :         // implement flushableBatchIter. Unlike the indexing on a normal batch, a
    1888             :         // flushable batch is indexed such that batch entry i will be given the
    1889             :         // sequence number flushableBatch.seqNum+i.
    1890             :         //
    1891             :         // Sorted in increasing order of key and decreasing order of offset (since
    1892             :         // higher offsets correspond to higher sequence numbers).
    1893             :         //
    1894             :         // Does not include range deletion entries or range key entries.
    1895             :         offsets []flushableBatchEntry
    1896             : 
    1897             :         // Fragmented range deletion tombstones.
    1898             :         tombstones []keyspan.Span
    1899             : 
    1900             :         // Fragmented range keys.
    1901             :         rangeKeys []keyspan.Span
    1902             : }
    1903             : 
    1904             : var _ flushable = (*flushableBatch)(nil)
    1905             : 
    1906             : // newFlushableBatch creates a new batch that implements the flushable
    1907             : // interface. This allows the batch to act like a memtable and be placed in the
    1908             : // queue of flushable memtables. Note that the flushable batch takes ownership
    1909             : // of the batch data.
    1910           2 : func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error) {
    1911           2 :         b := &flushableBatch{
    1912           2 :                 data:     batch.data,
    1913           2 :                 cmp:      comparer.Compare,
    1914           2 :                 comparer: comparer,
    1915           2 :                 offsets:  make([]flushableBatchEntry, 0, batch.Count()),
    1916           2 :         }
    1917           2 :         if b.data != nil {
    1918           2 :                 // Note that this sequence number is not correct when this batch has not
    1919           2 :                 // been applied since the sequence number has not been assigned yet. The
    1920           2 :                 // correct sequence number will be set later. But it is correct when the
    1921           2 :                 // batch is being replayed from the WAL.
    1922           2 :                 b.seqNum = batch.SeqNum()
    1923           2 :         }
    1924           2 :         var rangeDelOffsets []flushableBatchEntry
    1925           2 :         var rangeKeyOffsets []flushableBatchEntry
    1926           2 :         if len(b.data) > batchrepr.HeaderLen {
    1927           2 :                 // Non-empty batch.
    1928           2 :                 var index uint32
    1929           2 :                 for iter := batchrepr.Read(b.data); len(iter) > 0; {
    1930           2 :                         offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
    1931           2 :                         kind, key, _, ok, err := iter.Next()
    1932           2 :                         if !ok {
    1933           0 :                                 if err != nil {
    1934           0 :                                         return nil, err
    1935           0 :                                 }
    1936           0 :                                 break
    1937             :                         }
    1938           2 :                         entry := flushableBatchEntry{
    1939           2 :                                 offset: uint32(offset),
    1940           2 :                                 index:  uint32(index),
    1941           2 :                         }
    1942           2 :                         if keySize := uint32(len(key)); keySize == 0 {
    1943           2 :                                 // Must add 2 to the offset. One byte encodes `kind` and the next
    1944           2 :                                 // byte encodes `0`, which is the length of the key.
    1945           2 :                                 entry.keyStart = uint32(offset) + 2
    1946           2 :                                 entry.keyEnd = entry.keyStart
    1947           2 :                         } else {
    1948           2 :                                 entry.keyStart = uint32(uintptr(unsafe.Pointer(&key[0])) -
    1949           2 :                                         uintptr(unsafe.Pointer(&b.data[0])))
    1950           2 :                                 entry.keyEnd = entry.keyStart + keySize
    1951           2 :                         }
    1952           2 :                         switch kind {
    1953           2 :                         case InternalKeyKindRangeDelete:
    1954           2 :                                 rangeDelOffsets = append(rangeDelOffsets, entry)
    1955           2 :                         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
    1956           2 :                                 rangeKeyOffsets = append(rangeKeyOffsets, entry)
    1957           2 :                         case InternalKeyKindLogData:
    1958           2 :                                 // Skip it; we never want to iterate over LogDatas.
    1959           2 :                                 continue
    1960             :                         case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge,
    1961           2 :                                 InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
    1962           2 :                                 b.offsets = append(b.offsets, entry)
    1963           0 :                         default:
    1964           0 :                                 // Note In some circumstances this might be temporary memory
    1965           0 :                                 // corruption that can be recovered by discarding the batch and
    1966           0 :                                 // trying again. In other cases, the batch repr might've been
    1967           0 :                                 // already persisted elsewhere, and we'll loop continuously trying
    1968           0 :                                 // to commit the same corrupted batch. The caller is responsible for
    1969           0 :                                 // distinguishing.
    1970           0 :                                 return nil, errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
    1971             :                         }
    1972             :                         // NB: index (used for entry.offset above) must not reach the
    1973             :                         // batch.count, because the offset is used in conjunction with the
    1974             :                         // batch's sequence number to assign sequence numbers to keys within
    1975             :                         // the batch. If we assign KV's indexes as high as batch.count,
    1976             :                         // we'll begin assigning keys sequence numbers that weren't
    1977             :                         // allocated.
    1978           2 :                         if index >= uint32(batch.count) {
    1979           0 :                                 return nil, base.AssertionFailedf("pebble: batch entry index %d ≥ batch.count %d", index, batch.count)
    1980           0 :                         }
    1981           2 :                         index++
    1982             :                 }
    1983             :         }
    1984             : 
    1985             :         // Sort all of offsets, rangeDelOffsets and rangeKeyOffsets, using *batch's
    1986             :         // sort.Interface implementation.
    1987           2 :         pointOffsets := b.offsets
    1988           2 :         sort.Sort(b)
    1989           2 :         b.offsets = rangeDelOffsets
    1990           2 :         sort.Sort(b)
    1991           2 :         b.offsets = rangeKeyOffsets
    1992           2 :         sort.Sort(b)
    1993           2 :         b.offsets = pointOffsets
    1994           2 : 
    1995           2 :         if len(rangeDelOffsets) > 0 {
    1996           2 :                 frag := &keyspan.Fragmenter{
    1997           2 :                         Cmp:    b.cmp,
    1998           2 :                         Format: b.comparer.FormatKey,
    1999           2 :                         Emit: func(s keyspan.Span) {
    2000           2 :                                 b.tombstones = append(b.tombstones, s)
    2001           2 :                         },
    2002             :                 }
    2003           2 :                 it := &flushableBatchIter{
    2004           2 :                         batch:   b,
    2005           2 :                         data:    b.data,
    2006           2 :                         offsets: rangeDelOffsets,
    2007           2 :                         cmp:     b.cmp,
    2008           2 :                         index:   -1,
    2009           2 :                 }
    2010           2 :                 fragmentRangeDels(frag, it, len(rangeDelOffsets))
    2011             :         }
    2012           2 :         if len(rangeKeyOffsets) > 0 {
    2013           2 :                 frag := &keyspan.Fragmenter{
    2014           2 :                         Cmp:    b.cmp,
    2015           2 :                         Format: b.comparer.FormatKey,
    2016           2 :                         Emit: func(s keyspan.Span) {
    2017           2 :                                 b.rangeKeys = append(b.rangeKeys, s)
    2018           2 :                         },
    2019             :                 }
    2020           2 :                 it := &flushableBatchIter{
    2021           2 :                         batch:   b,
    2022           2 :                         data:    b.data,
    2023           2 :                         offsets: rangeKeyOffsets,
    2024           2 :                         cmp:     b.cmp,
    2025           2 :                         index:   -1,
    2026           2 :                 }
    2027           2 :                 fragmentRangeKeys(frag, it, len(rangeKeyOffsets))
    2028             :         }
    2029           2 :         return b, nil
    2030             : }
    2031             : 
    2032           2 : func (b *flushableBatch) setSeqNum(seqNum base.SeqNum) {
    2033           2 :         if b.seqNum != 0 {
    2034           0 :                 panic(fmt.Sprintf("pebble: flushableBatch.seqNum already set: %d", b.seqNum))
    2035             :         }
    2036           2 :         b.seqNum = seqNum
    2037           2 :         for i := range b.tombstones {
    2038           2 :                 for j := range b.tombstones[i].Keys {
    2039           2 :                         b.tombstones[i].Keys[j].Trailer = base.MakeTrailer(
    2040           2 :                                 b.tombstones[i].Keys[j].SeqNum()+seqNum,
    2041           2 :                                 b.tombstones[i].Keys[j].Kind(),
    2042           2 :                         )
    2043           2 :                 }
    2044             :         }
    2045           2 :         for i := range b.rangeKeys {
    2046           2 :                 for j := range b.rangeKeys[i].Keys {
    2047           2 :                         b.rangeKeys[i].Keys[j].Trailer = base.MakeTrailer(
    2048           2 :                                 b.rangeKeys[i].Keys[j].SeqNum()+seqNum,
    2049           2 :                                 b.rangeKeys[i].Keys[j].Kind(),
    2050           2 :                         )
    2051           2 :                 }
    2052             :         }
    2053             : }
    2054             : 
    2055           2 : func (b *flushableBatch) Len() int {
    2056           2 :         return len(b.offsets)
    2057           2 : }
    2058             : 
    2059           2 : func (b *flushableBatch) Less(i, j int) bool {
    2060           2 :         ei := &b.offsets[i]
    2061           2 :         ej := &b.offsets[j]
    2062           2 :         ki := b.data[ei.keyStart:ei.keyEnd]
    2063           2 :         kj := b.data[ej.keyStart:ej.keyEnd]
    2064           2 :         switch c := b.cmp(ki, kj); {
    2065           2 :         case c < 0:
    2066           2 :                 return true
    2067           2 :         case c > 0:
    2068           2 :                 return false
    2069           2 :         default:
    2070           2 :                 return ei.offset > ej.offset
    2071             :         }
    2072             : }
    2073             : 
    2074           2 : func (b *flushableBatch) Swap(i, j int) {
    2075           2 :         b.offsets[i], b.offsets[j] = b.offsets[j], b.offsets[i]
    2076           2 : }
    2077             : 
    2078             : // newIter is part of the flushable interface.
    2079           2 : func (b *flushableBatch) newIter(o *IterOptions) internalIterator {
    2080           2 :         return &flushableBatchIter{
    2081           2 :                 batch:   b,
    2082           2 :                 data:    b.data,
    2083           2 :                 offsets: b.offsets,
    2084           2 :                 cmp:     b.cmp,
    2085           2 :                 index:   -1,
    2086           2 :                 lower:   o.GetLowerBound(),
    2087           2 :                 upper:   o.GetUpperBound(),
    2088           2 :         }
    2089           2 : }
    2090             : 
    2091             : // newFlushIter is part of the flushable interface.
    2092           2 : func (b *flushableBatch) newFlushIter(o *IterOptions) internalIterator {
    2093           2 :         return &flushFlushableBatchIter{
    2094           2 :                 flushableBatchIter: flushableBatchIter{
    2095           2 :                         batch:   b,
    2096           2 :                         data:    b.data,
    2097           2 :                         offsets: b.offsets,
    2098           2 :                         cmp:     b.cmp,
    2099           2 :                         index:   -1,
    2100           2 :                 },
    2101           2 :         }
    2102           2 : }
    2103             : 
    2104             : // newRangeDelIter is part of the flushable interface.
    2105           2 : func (b *flushableBatch) newRangeDelIter(o *IterOptions) keyspan.FragmentIterator {
    2106           2 :         if len(b.tombstones) == 0 {
    2107           2 :                 return nil
    2108           2 :         }
    2109           2 :         return keyspan.NewIter(b.cmp, b.tombstones)
    2110             : }
    2111             : 
    2112             : // newRangeKeyIter is part of the flushable interface.
    2113           2 : func (b *flushableBatch) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
    2114           2 :         if len(b.rangeKeys) == 0 {
    2115           2 :                 return nil
    2116           2 :         }
    2117           2 :         return keyspan.NewIter(b.cmp, b.rangeKeys)
    2118             : }
    2119             : 
    2120             : // containsRangeKeys is part of the flushable interface.
    2121           2 : func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0 }
    2122             : 
    2123             : // inuseBytes is part of the flushable interface.
    2124           2 : func (b *flushableBatch) inuseBytes() uint64 {
    2125           2 :         return uint64(len(b.data) - batchrepr.HeaderLen)
    2126           2 : }
    2127             : 
    2128             : // totalBytes is part of the flushable interface.
    2129           2 : func (b *flushableBatch) totalBytes() uint64 {
    2130           2 :         return uint64(cap(b.data))
    2131           2 : }
    2132             : 
    2133             : // readyForFlush is part of the flushable interface.
    2134           2 : func (b *flushableBatch) readyForFlush() bool {
    2135           2 :         // A flushable batch is always ready for flush; it must be flushed together
    2136           2 :         // with the previous memtable.
    2137           2 :         return true
    2138           2 : }
    2139             : 
    2140             : // computePossibleOverlaps is part of the flushable interface.
    2141             : func (b *flushableBatch) computePossibleOverlaps(
    2142             :         fn func(bounded) shouldContinue, bounded ...bounded,
    2143           2 : ) {
    2144           2 :         computePossibleOverlapsGenericImpl[*flushableBatch](b, b.cmp, fn, bounded)
    2145           2 : }
    2146             : 
    2147             : // Note: flushableBatchIter mirrors the implementation of batchIter. Keep the
    2148             : // two in sync.
    2149             : type flushableBatchIter struct {
    2150             :         // Members to be initialized by creator.
    2151             :         batch *flushableBatch
    2152             :         // The bytes backing the batch. Always the same as batch.data?
    2153             :         data []byte
    2154             :         // The sorted entries. This is not always equal to batch.offsets.
    2155             :         offsets []flushableBatchEntry
    2156             :         cmp     Compare
    2157             :         // Must be initialized to -1. It is the index into offsets that represents
    2158             :         // the current iterator position.
    2159             :         index int
    2160             : 
    2161             :         // For internal use by the implementation.
    2162             :         kv  base.InternalKV
    2163             :         err error
    2164             : 
    2165             :         // Optionally initialize to bounds of iteration, if any.
    2166             :         lower []byte
    2167             :         upper []byte
    2168             : }
    2169             : 
    2170             : // flushableBatchIter implements the base.InternalIterator interface.
    2171             : var _ base.InternalIterator = (*flushableBatchIter)(nil)
    2172             : 
    2173           0 : func (i *flushableBatchIter) String() string {
    2174           0 :         return "flushable-batch"
    2175           0 : }
    2176             : 
    2177             : // SeekGE implements internalIterator.SeekGE, as documented in the pebble
    2178             : // package. Ignore flags.TrySeekUsingNext() since we don't expect this
    2179             : // optimization to provide much benefit here at the moment.
    2180           2 : func (i *flushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
    2181           2 :         i.err = nil // clear cached iteration error
    2182           2 :         ikey := base.MakeSearchKey(key)
    2183           2 :         i.index = sort.Search(len(i.offsets), func(j int) bool {
    2184           2 :                 return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
    2185           2 :         })
    2186           2 :         if i.index >= len(i.offsets) {
    2187           2 :                 return nil
    2188           2 :         }
    2189           2 :         kv := i.getKV(i.index)
    2190           2 :         if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
    2191           2 :                 i.index = len(i.offsets)
    2192           2 :                 return nil
    2193           2 :         }
    2194           2 :         return kv
    2195             : }
    2196             : 
    2197             : // SeekPrefixGE implements internalIterator.SeekPrefixGE, as documented in the
    2198             : // pebble package.
    2199             : func (i *flushableBatchIter) SeekPrefixGE(
    2200             :         prefix, key []byte, flags base.SeekGEFlags,
    2201           2 : ) *base.InternalKV {
    2202           2 :         kv := i.SeekGE(key, flags)
    2203           2 :         if kv == nil {
    2204           2 :                 return nil
    2205           2 :         }
    2206             :         // If the key doesn't have the sought prefix, return nil.
    2207           2 :         if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
    2208           2 :                 return nil
    2209           2 :         }
    2210           2 :         return kv
    2211             : }
    2212             : 
    2213             : // SeekLT implements internalIterator.SeekLT, as documented in the pebble
    2214             : // package.
    2215           2 : func (i *flushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
    2216           2 :         i.err = nil // clear cached iteration error
    2217           2 :         ikey := base.MakeSearchKey(key)
    2218           2 :         i.index = sort.Search(len(i.offsets), func(j int) bool {
    2219           2 :                 return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
    2220           2 :         })
    2221           2 :         i.index--
    2222           2 :         if i.index < 0 {
    2223           2 :                 return nil
    2224           2 :         }
    2225           2 :         kv := i.getKV(i.index)
    2226           2 :         if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
    2227           2 :                 i.index = -1
    2228           2 :                 return nil
    2229           2 :         }
    2230           2 :         return kv
    2231             : }
    2232             : 
    2233             : // First implements internalIterator.First, as documented in the pebble
    2234             : // package.
    2235           2 : func (i *flushableBatchIter) First() *base.InternalKV {
    2236           2 :         i.err = nil // clear cached iteration error
    2237           2 :         if len(i.offsets) == 0 {
    2238           2 :                 return nil
    2239           2 :         }
    2240           2 :         i.index = 0
    2241           2 :         kv := i.getKV(i.index)
    2242           2 :         if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
    2243           2 :                 i.index = len(i.offsets)
    2244           2 :                 return nil
    2245           2 :         }
    2246           2 :         return kv
    2247             : }
    2248             : 
    2249             : // Last implements internalIterator.Last, as documented in the pebble
    2250             : // package.
    2251           2 : func (i *flushableBatchIter) Last() *base.InternalKV {
    2252           2 :         i.err = nil // clear cached iteration error
    2253           2 :         if len(i.offsets) == 0 {
    2254           1 :                 return nil
    2255           1 :         }
    2256           2 :         i.index = len(i.offsets) - 1
    2257           2 :         kv := i.getKV(i.index)
    2258           2 :         if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
    2259           1 :                 i.index = -1
    2260           1 :                 return nil
    2261           1 :         }
    2262           2 :         return kv
    2263             : }
    2264             : 
    2265             : // Note: flushFlushableBatchIter.Next mirrors the implementation of
    2266             : // flushableBatchIter.Next due to performance. Keep the two in sync.
    2267           2 : func (i *flushableBatchIter) Next() *base.InternalKV {
    2268           2 :         if i.index == len(i.offsets) {
    2269           0 :                 return nil
    2270           0 :         }
    2271           2 :         i.index++
    2272           2 :         if i.index == len(i.offsets) {
    2273           2 :                 return nil
    2274           2 :         }
    2275           2 :         kv := i.getKV(i.index)
    2276           2 :         if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
    2277           2 :                 i.index = len(i.offsets)
    2278           2 :                 return nil
    2279           2 :         }
    2280           2 :         return kv
    2281             : }
    2282             : 
    2283           2 : func (i *flushableBatchIter) Prev() *base.InternalKV {
    2284           2 :         if i.index < 0 {
    2285           0 :                 return nil
    2286           0 :         }
    2287           2 :         i.index--
    2288           2 :         if i.index < 0 {
    2289           2 :                 return nil
    2290           2 :         }
    2291           2 :         kv := i.getKV(i.index)
    2292           2 :         if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
    2293           2 :                 i.index = -1
    2294           2 :                 return nil
    2295           2 :         }
    2296           2 :         return kv
    2297             : }
    2298             : 
    2299             : // Note: flushFlushableBatchIter.NextPrefix mirrors the implementation of
    2300             : // flushableBatchIter.NextPrefix due to performance. Keep the two in sync.
    2301           0 : func (i *flushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV {
    2302           0 :         return i.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
    2303           0 : }
    2304             : 
    2305           2 : func (i *flushableBatchIter) getKey(index int) InternalKey {
    2306           2 :         e := &i.offsets[index]
    2307           2 :         kind := InternalKeyKind(i.data[e.offset])
    2308           2 :         key := i.data[e.keyStart:e.keyEnd]
    2309           2 :         return base.MakeInternalKey(key, i.batch.seqNum+base.SeqNum(e.index), kind)
    2310           2 : }
    2311             : 
    2312           2 : func (i *flushableBatchIter) getKV(index int) *base.InternalKV {
    2313           2 :         i.kv = base.InternalKV{
    2314           2 :                 K: i.getKey(index),
    2315           2 :                 V: i.extractValue(),
    2316           2 :         }
    2317           2 :         return &i.kv
    2318           2 : }
    2319             : 
    2320           2 : func (i *flushableBatchIter) extractValue() base.LazyValue {
    2321           2 :         p := i.data[i.offsets[i.index].offset:]
    2322           2 :         if len(p) == 0 {
    2323           0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    2324           0 :                 return base.LazyValue{}
    2325           0 :         }
    2326           2 :         kind := InternalKeyKind(p[0])
    2327           2 :         if kind > InternalKeyKindMax {
    2328           0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    2329           0 :                 return base.LazyValue{}
    2330           0 :         }
    2331           2 :         var value []byte
    2332           2 :         var ok bool
    2333           2 :         switch kind {
    2334             :         case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
    2335             :                 InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
    2336           2 :                 InternalKeyKindDeleteSized:
    2337           2 :                 keyEnd := i.offsets[i.index].keyEnd
    2338           2 :                 _, value, ok = batchrepr.DecodeStr(i.data[keyEnd:])
    2339           2 :                 if !ok {
    2340           0 :                         i.err = base.CorruptionErrorf("corrupted batch")
    2341           0 :                         return base.LazyValue{}
    2342           0 :                 }
    2343             :         }
    2344           2 :         return base.MakeInPlaceValue(value)
    2345             : }
    2346             : 
    2347           0 : func (i *flushableBatchIter) Valid() bool {
    2348           0 :         return i.index >= 0 && i.index < len(i.offsets)
    2349           0 : }
    2350             : 
    2351           2 : func (i *flushableBatchIter) Error() error {
    2352           2 :         return i.err
    2353           2 : }
    2354             : 
    2355           2 : func (i *flushableBatchIter) Close() error {
    2356           2 :         return i.err
    2357           2 : }
    2358             : 
    2359           2 : func (i *flushableBatchIter) SetBounds(lower, upper []byte) {
    2360           2 :         i.lower = lower
    2361           2 :         i.upper = upper
    2362           2 : }
    2363             : 
    2364           0 : func (i *flushableBatchIter) SetContext(_ context.Context) {}
    2365             : 
    2366             : // DebugTree is part of the InternalIterator interface.
    2367           0 : func (i *flushableBatchIter) DebugTree(tp treeprinter.Node) {
    2368           0 :         tp.Childf("%T(%p)", i, i)
    2369           0 : }
    2370             : 
    2371             : // flushFlushableBatchIter is similar to flushableBatchIter but it keeps track
    2372             : // of number of bytes iterated.
    2373             : type flushFlushableBatchIter struct {
    2374             :         flushableBatchIter
    2375             : }
    2376             : 
    2377             : // flushFlushableBatchIter implements the base.InternalIterator interface.
    2378             : var _ base.InternalIterator = (*flushFlushableBatchIter)(nil)
    2379             : 
    2380           0 : func (i *flushFlushableBatchIter) String() string {
    2381           0 :         return "flushable-batch"
    2382           0 : }
    2383             : 
    2384           0 : func (i *flushFlushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
    2385           0 :         panic("pebble: SeekGE unimplemented")
    2386             : }
    2387             : 
    2388             : func (i *flushFlushableBatchIter) SeekPrefixGE(
    2389             :         prefix, key []byte, flags base.SeekGEFlags,
    2390           0 : ) *base.InternalKV {
    2391           0 :         panic("pebble: SeekPrefixGE unimplemented")
    2392             : }
    2393             : 
    2394           0 : func (i *flushFlushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
    2395           0 :         panic("pebble: SeekLT unimplemented")
    2396             : }
    2397             : 
    2398           2 : func (i *flushFlushableBatchIter) First() *base.InternalKV {
    2399           2 :         i.err = nil // clear cached iteration error
    2400           2 :         return i.flushableBatchIter.First()
    2401           2 : }
    2402             : 
    2403           0 : func (i *flushFlushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV {
    2404           0 :         panic("pebble: Prev unimplemented")
    2405             : }
    2406             : 
    2407             : // Note: flushFlushableBatchIter.Next mirrors the implementation of
    2408             : // flushableBatchIter.Next due to performance. Keep the two in sync.
    2409           2 : func (i *flushFlushableBatchIter) Next() *base.InternalKV {
    2410           2 :         if i.index == len(i.offsets) {
    2411           0 :                 return nil
    2412           0 :         }
    2413           2 :         i.index++
    2414           2 :         if i.index == len(i.offsets) {
    2415           2 :                 return nil
    2416           2 :         }
    2417           2 :         return i.getKV(i.index)
    2418             : }
    2419             : 
    2420           0 : func (i flushFlushableBatchIter) Prev() *base.InternalKV {
    2421           0 :         panic("pebble: Prev unimplemented")
    2422             : }
    2423             : 
    2424             : // batchOptions holds the parameters to configure batch.
    2425             : type batchOptions struct {
    2426             :         initialSizeBytes     int
    2427             :         maxRetainedSizeBytes int
    2428             : }
    2429             : 
    2430             : // ensureDefaults creates batch options with default values.
    2431           2 : func (o *batchOptions) ensureDefaults() {
    2432           2 :         if o.initialSizeBytes <= 0 {
    2433           2 :                 o.initialSizeBytes = defaultBatchInitialSize
    2434           2 :         }
    2435           2 :         if o.maxRetainedSizeBytes <= 0 {
    2436           2 :                 o.maxRetainedSizeBytes = defaultBatchMaxRetainedSize
    2437           2 :         }
    2438             : }
    2439             : 
    2440             : // BatchOption allows customizing the batch.
    2441             : type BatchOption func(*batchOptions)
    2442             : 
    2443             : // WithInitialSizeBytes sets a custom initial size for the batch. Defaults
    2444             : // to 1KB.
    2445           1 : func WithInitialSizeBytes(s int) BatchOption {
    2446           1 :         return func(opts *batchOptions) {
    2447           1 :                 opts.initialSizeBytes = s
    2448           1 :         }
    2449             : }
    2450             : 
    2451             : // WithMaxRetainedSizeBytes sets a custom max size for the batch to be
    2452             : // re-used. Any batch which exceeds the max retained size would be GC-ed.
    2453             : // Defaults to 1MB.
    2454           1 : func WithMaxRetainedSizeBytes(s int) BatchOption {
    2455           1 :         return func(opts *batchOptions) {
    2456           1 :                 opts.maxRetainedSizeBytes = s
    2457           1 :         }
    2458             : }
    2459             : 
    2460             : // batchSort returns iterators for the sorted contents of the batch. It is
    2461             : // intended for testing use only. The batch.Sort dance is done to prevent
    2462             : // exposing this method in the public pebble interface.
    2463             : func batchSort(
    2464             :         i interface{},
    2465             : ) (
    2466             :         points internalIterator,
    2467             :         rangeDels keyspan.FragmentIterator,
    2468             :         rangeKeys keyspan.FragmentIterator,
    2469           2 : ) {
    2470           2 :         b := i.(*Batch)
    2471           2 :         if b.Indexed() {
    2472           2 :                 pointIter := b.newInternalIter(nil)
    2473           2 :                 rangeDelIter := b.newRangeDelIter(nil, math.MaxUint64)
    2474           2 :                 rangeKeyIter := b.newRangeKeyIter(nil, math.MaxUint64)
    2475           2 :                 return pointIter, rangeDelIter, rangeKeyIter
    2476           2 :         }
    2477           2 :         f, err := newFlushableBatch(b, b.db.opts.Comparer)
    2478           2 :         if err != nil {
    2479           0 :                 panic(err)
    2480             :         }
    2481           2 :         return f.newIter(nil), f.newRangeDelIter(nil), f.newRangeKeyIter(nil)
    2482             : }
    2483             : 
    2484           2 : func init() {
    2485           2 :         private.BatchSort = batchSort
    2486           2 : }

Generated by: LCOV version 1.14