LCOV - code coverage report
Current view: top level - pebble - batch.go (source / functions) Coverage Total Hit
Test: 2025-07-05 08:18Z 6f57a213 - meta test only.lcov Lines: 78.1 % 1403 1096
Test Date: 2025-07-05 08:20:30 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "encoding/binary"
      11              :         "fmt"
      12              :         "io"
      13              :         "math"
      14              :         "sort"
      15              :         "sync"
      16              :         "sync/atomic"
      17              :         "time"
      18              :         "unsafe"
      19              : 
      20              :         "github.com/cockroachdb/crlib/crtime"
      21              :         "github.com/cockroachdb/errors"
      22              :         "github.com/cockroachdb/pebble/batchrepr"
      23              :         "github.com/cockroachdb/pebble/internal/base"
      24              :         "github.com/cockroachdb/pebble/internal/batchskl"
      25              :         "github.com/cockroachdb/pebble/internal/humanize"
      26              :         "github.com/cockroachdb/pebble/internal/invariants"
      27              :         "github.com/cockroachdb/pebble/internal/keyspan"
      28              :         "github.com/cockroachdb/pebble/internal/private"
      29              :         "github.com/cockroachdb/pebble/internal/rangedel"
      30              :         "github.com/cockroachdb/pebble/internal/rangekey"
      31              :         "github.com/cockroachdb/pebble/internal/rawalloc"
      32              :         "github.com/cockroachdb/pebble/internal/treeprinter"
      33              : )
      34              : 
      35              : const (
      36              :         invalidBatchCount = 1<<32 - 1
      37              :         maxVarintLen32    = 5
      38              : 
      39              :         defaultBatchInitialSize     = 1 << 10 // 1 KB
      40              :         defaultBatchMaxRetainedSize = 1 << 20 // 1 MB
      41              : )
      42              : 
      43              : // ErrNotIndexed means that a read operation on a batch failed because the
      44              : // batch is not indexed and thus doesn't support reads.
      45              : var ErrNotIndexed = errors.New("pebble: batch not indexed")
      46              : 
      47              : // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
      48              : var ErrInvalidBatch = batchrepr.ErrInvalidBatch
      49              : 
      50              : // ErrBatchTooLarge indicates that the size of this batch is over the limit of 4GB.
      51              : var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize)))
      52              : 
      53              : // DeferredBatchOp represents a batch operation (eg. set, merge, delete) that is
      54              : // being inserted into the batch. Indexing is not performed on the specified key
      55              : // until Finish is called, hence the name deferred. This struct lets the caller
      56              : // copy or encode keys/values directly into the batch representation instead of
      57              : // copying into an intermediary buffer then having pebble.Batch copy off of it.
      58              : type DeferredBatchOp struct {
      59              :         index *batchskl.Skiplist
      60              : 
      61              :         // Key and Value point to parts of the binary batch representation where
      62              :         // keys and values should be encoded/copied into. len(Key) and len(Value)
      63              :         // bytes must be copied into these slices respectively before calling
      64              :         // Finish(). Changing where these slices point to is not allowed.
      65              :         Key, Value []byte
      66              :         offset     uint32
      67              : }
      68              : 
      69              : // Finish completes the addition of this batch operation, and adds it to the
      70              : // index if necessary. Must be called once (and exactly once) keys/values
      71              : // have been filled into Key and Value. Not calling Finish or not
      72              : // copying/encoding keys will result in an incomplete index, and calling Finish
      73              : // twice may result in a panic.
      74            0 : func (d DeferredBatchOp) Finish() error {
      75            0 :         if d.index != nil {
      76            0 :                 if err := d.index.Add(d.offset); err != nil {
      77            0 :                         return err
      78            0 :                 }
      79              :         }
      80            0 :         return nil
      81              : }
      82              : 
      83              : // A Batch is a sequence of Sets, Merges, Deletes, DeleteRanges, RangeKeySets,
      84              : // RangeKeyUnsets, and/or RangeKeyDeletes that are applied atomically. Batch
      85              : // implements the Reader interface, but only an indexed batch supports reading
      86              : // (without error) via Get or NewIter. A non-indexed batch will return
      87              : // ErrNotIndexed when read from. A batch is not safe for concurrent use, and
      88              : // consumers should use a batch per goroutine or provide their own
      89              : // synchronization.
      90              : //
      91              : // # Indexing
      92              : //
      93              : // Batches can be optionally indexed (see DB.NewIndexedBatch). An indexed batch
      94              : // allows iteration via an Iterator (see Batch.NewIter). The iterator provides
      95              : // a merged view of the operations in the batch and the underlying
      96              : // database. This is implemented by treating the batch as an additional layer
      97              : // in the LSM where every entry in the batch is considered newer than any entry
      98              : // in the underlying database (batch entries have the InternalKeySeqNumBatch
      99              : // bit set). By treating the batch as an additional layer in the LSM, iteration
     100              : // supports all batch operations (i.e. Set, Merge, Delete, DeleteRange,
     101              : // RangeKeySet, RangeKeyUnset, RangeKeyDelete) with minimal effort.
     102              : //
     103              : // The same key can be operated on multiple times in a batch, though only the
     104              : // latest operation will be visible. For example, Put("a", "b"), Delete("a")
     105              : // will cause the key "a" to not be visible in the batch. Put("a", "b"),
     106              : // Put("a", "c") will cause a read of "a" to return the value "c".
     107              : //
     108              : // The batch index is implemented via an skiplist (internal/batchskl). While
     109              : // the skiplist implementation is very fast, inserting into an indexed batch is
     110              : // significantly slower than inserting into a non-indexed batch. Only use an
     111              : // indexed batch if you require reading from it.
     112              : //
     113              : // # Atomic commit
     114              : //
     115              : // The operations in a batch are persisted by calling Batch.Commit which is
     116              : // equivalent to calling DB.Apply(batch). A batch is committed atomically by
     117              : // writing the internal batch representation to the WAL, adding all of the
     118              : // batch operations to the memtable associated with the WAL, and then
     119              : // incrementing the visible sequence number so that subsequent reads can see
     120              : // the effects of the batch operations. If WriteOptions.Sync is true, a call to
     121              : // Batch.Commit will guarantee that the batch is persisted to disk before
     122              : // returning. See commitPipeline for more on the implementation details.
     123              : //
     124              : // # Large batches
     125              : //
     126              : // The size of a batch is limited to 4GB, the max that can be represented by
     127              : // a uint32 type. Be aware that indexed batches require considerably more
     128              : // memory for the skiplist structure (this skiplist is separate from the 4GB
     129              : // batch limit). For users that require atomic writes of data that's greater
     130              : // than 4GB, DB.Ingest() is able to atomically ingest pre-computed sstables.
     131              : // A given WAL file has a single memtable associated with it (this restriction
     132              : // could be removed, but doing so is onerous and complex). And a memtable has
     133              : // a fixed size due to the underlying fixed size arena. Note that this differs
     134              : // from RocksDB where a memtable can grow arbitrarily large using a list of
     135              : // arena chunks. In RocksDB this is accomplished by storing pointers in the
     136              : // arena memory, but that isn't possible in Go.
     137              : //
     138              : // During Batch.Commit, a batch which is larger than a threshold (>
     139              : // MemTableSize/2) is wrapped in a flushableBatch and inserted into the queue
     140              : // of memtables. A flushableBatch forces WAL to be rotated, but that happens
     141              : // anyways when the memtable becomes full so this does not cause significant
     142              : // WAL churn. Because the flushableBatch is readable as another layer in the
     143              : // LSM, Batch.Commit returns as soon as the flushableBatch has been added to
     144              : // the queue of memtables.
     145              : //
     146              : // Internally, a flushableBatch provides Iterator support by sorting the batch
     147              : // contents (the batch is sorted once, when it is added to the memtable
     148              : // queue). Sorting the batch contents and insertion of the contents into a
     149              : // memtable have the same big-O time, but the constant factor dominates
     150              : // here. Sorting is significantly faster and uses significantly less memory.
     151              : //
     152              : // # Internal representation
     153              : //
     154              : // The internal batch representation is a contiguous byte buffer with a fixed
     155              : // 12-byte header, followed by a series of records.
     156              : //
     157              : //      +-------------+------------+--- ... ---+
     158              : //      | SeqNum (8B) | Count (4B) |  Entries  |
     159              : //      +-------------+------------+--- ... ---+
     160              : //
     161              : // Each record has a 1-byte kind tag prefix, followed by 1 or 2 length prefixed
     162              : // strings (varstring):
     163              : //
     164              : //      +-----------+-----------------+-------------------+
     165              : //      | Kind (1B) | Key (varstring) | Value (varstring) |
     166              : //      +-----------+-----------------+-------------------+
     167              : //
     168              : // A varstring is a varint32 followed by N bytes of data. The Kind tags are
     169              : // exactly those specified by InternalKeyKind. The following table shows the
     170              : // format for records of each kind:
     171              : //
     172              : //      InternalKeyKindDelete         varstring
     173              : //      InternalKeyKindLogData        varstring
     174              : //      InternalKeyKindIngestSST      varstring
     175              : //      InternalKeyKindSet            varstring varstring
     176              : //      InternalKeyKindMerge          varstring varstring
     177              : //      InternalKeyKindRangeDelete    varstring varstring
     178              : //      InternalKeyKindRangeKeySet    varstring varstring
     179              : //      InternalKeyKindRangeKeyUnset  varstring varstring
     180              : //      InternalKeyKindRangeKeyDelete varstring varstring
     181              : //
     182              : // The intuitive understanding here are that the arguments to Delete, Set,
     183              : // Merge, DeleteRange and RangeKeyDelete are encoded into the batch. The
     184              : // RangeKeySet and RangeKeyUnset operations are slightly more complicated,
     185              : // encoding their end key, suffix and value [in the case of RangeKeySet] within
     186              : // the Value varstring. For more information on the value encoding for
     187              : // RangeKeySet and RangeKeyUnset, see the internal/rangekey package.
     188              : //
     189              : // The internal batch representation is the on disk format for a batch in the
     190              : // WAL, and thus stable. New record kinds may be added, but the existing ones
     191              : // will not be modified.
     192              : type Batch struct {
     193              :         batchInternal
     194              :         applied atomic.Bool
     195              :         // lifecycle is used to negotiate the lifecycle of a Batch. A Batch and its
     196              :         // underlying batchInternal.data byte slice may be reused. There are two
     197              :         // mechanisms for reuse:
     198              :         //
     199              :         // 1. The caller may explicitly call [Batch.Reset] to reset the batch to be
     200              :         //    empty (while retaining the underlying repr's buffer).
     201              :         // 2. The caller may call [Batch.Close], passing ownership off to Pebble,
     202              :         //    which may reuse the batch's memory to service new callers to
     203              :         //    [DB.NewBatch].
     204              :         //
     205              :         // There's a complication to reuse: When WAL failover is configured, the
     206              :         // Pebble commit pipeline may retain a pointer to the batch.data beyond the
     207              :         // return of [Batch.Commit]. The user of the Batch may commit their batch
     208              :         // and call Close or Reset before the commit pipeline is finished reading
     209              :         // the data slice. Recycling immediately would cause a data race.
     210              :         //
     211              :         // To resolve this data race, this [lifecycle] atomic is used to determine
     212              :         // safety and responsibility of reusing a batch. The low bits of the atomic
     213              :         // are used as a reference count (really just the lowest bit—in practice
     214              :         // there's only 1 code path that references). The [Batch] is passed into
     215              :         // [wal.Writer]'s WriteRecord method as a [RefCount] implementation. The
     216              :         // wal.Writer guarantees that if it will read [Batch.data] after the call to
     217              :         // WriteRecord returns, it will increment the reference count. When it's
     218              :         // complete, it'll unreference through invoking [Batch.Unref].
     219              :         //
     220              :         // When the committer of a batch indicates intent to recycle a Batch through
     221              :         // calling [Batch.Reset] or [Batch.Close], the lifecycle atomic is read. If
     222              :         // an outstanding reference remains, it's unsafe to reuse Batch.data yet. In
     223              :         // [Batch.Reset] the caller wants to reuse the [Batch] immediately, so we
     224              :         // discard b.data to recycle the struct but not the underlying byte slice.
     225              :         // In [Batch.Close], we set a special high bit [batchClosedBit] on lifecycle
     226              :         // that indicates that the user will not use [Batch] again and we're free to
     227              :         // recycle it when safe. When the commit pipeline eventually calls
     228              :         // [Batch.Unref], the [batchClosedBit] is noticed and the batch is
     229              :         // recycled.
     230              :         lifecycle atomic.Int32
     231              : }
     232              : 
     233              : // batchClosedBit is a bit stored on Batch.lifecycle to indicate that the user
     234              : // called [Batch.Close] to release a Batch, but an open reference count
     235              : // prevented immediate recycling.
     236              : const batchClosedBit = 1 << 30
     237              : 
     238              : // TODO(jackson): Hide the wal.RefCount implementation from the public Batch interface.
     239              : 
     240              : // Ref implements wal.RefCount. If the WAL writer may need to read b.data after
     241              : // it returns, it invokes Ref to increment the lifecycle's reference count. When
     242              : // it's finished, it invokes Unref.
     243            1 : func (b *Batch) Ref() {
     244            1 :         b.lifecycle.Add(+1)
     245            1 : }
     246              : 
     247              : // Unref implemets wal.RefCount.
     248            1 : func (b *Batch) Unref() {
     249            1 :         if v := b.lifecycle.Add(-1); (v ^ batchClosedBit) == 0 {
     250            1 :                 // The [batchClosedBit] high bit is set, and there are no outstanding
     251            1 :                 // references. The user of the Batch called [Batch.Close], expecting the
     252            1 :                 // batch to be recycled. However, our outstanding reference count
     253            1 :                 // prevented recycling. As the last to dereference, we're now
     254            1 :                 // responsible for releasing the batch.
     255            1 :                 b.lifecycle.Store(0)
     256            1 :                 b.release()
     257            1 :         }
     258              : }
     259              : 
     260              : // batchInternal contains the set of fields within Batch that are non-atomic and
     261              : // capable of being reset using a *b = batchInternal{} struct copy.
     262              : type batchInternal struct {
     263              :         // Data is the wire format of a batch's log entry:
     264              :         //   - 8 bytes for a sequence number of the first batch element,
     265              :         //     or zeroes if the batch has not yet been applied,
     266              :         //   - 4 bytes for the count: the number of elements in the batch,
     267              :         //     or "\xff\xff\xff\xff" if the batch is invalid,
     268              :         //   - count elements, being:
     269              :         //     - one byte for the kind
     270              :         //     - the varint-string user key,
     271              :         //     - the varint-string value (if kind != delete).
     272              :         // The sequence number and count are stored in little-endian order.
     273              :         //
     274              :         // The data field can be (but is not guaranteed to be) nil for new
     275              :         // batches. Large batches will set the data field to nil when committed as
     276              :         // the data has been moved to a flushableBatch and inserted into the queue of
     277              :         // memtables.
     278              :         data     []byte
     279              :         comparer *base.Comparer
     280              :         opts     batchOptions
     281              : 
     282              :         // An upper bound on required space to add this batch to a memtable.
     283              :         // Note that although batches are limited to 4 GiB in size, that limit
     284              :         // applies to len(data), not the memtable size. The upper bound on the
     285              :         // size of a memtable node is larger than the overhead of the batch's log
     286              :         // encoding, so memTableSize is larger than len(data) and may overflow a
     287              :         // uint32.
     288              :         memTableSize uint64
     289              : 
     290              :         // The db to which the batch will be committed. Do not change this field
     291              :         // after the batch has been created as it might invalidate internal state.
     292              :         // Batch.memTableSize is only refreshed if Batch.db is set. Setting db to
     293              :         // nil once it has been set implies that the Batch has encountered an error.
     294              :         db *DB
     295              : 
     296              :         // The count of records in the batch. This count will be stored in the batch
     297              :         // data whenever Repr() is called.
     298              :         count uint64
     299              : 
     300              :         // The count of range deletions in the batch. Updated every time a range
     301              :         // deletion is added.
     302              :         countRangeDels uint64
     303              : 
     304              :         // The count of range key sets, unsets and deletes in the batch. Updated
     305              :         // every time a RANGEKEYSET, RANGEKEYUNSET or RANGEKEYDEL key is added.
     306              :         countRangeKeys uint64
     307              : 
     308              :         // A deferredOp struct, stored in the Batch so that a pointer can be returned
     309              :         // from the *Deferred() methods rather than a value.
     310              :         deferredOp DeferredBatchOp
     311              : 
     312              :         // An optional skiplist keyed by offset into data of the entry.
     313              :         index         *batchskl.Skiplist
     314              :         rangeDelIndex *batchskl.Skiplist
     315              :         rangeKeyIndex *batchskl.Skiplist
     316              : 
     317              :         // Fragmented range deletion tombstones. Cached the first time a range
     318              :         // deletion iterator is requested. The cache is invalidated whenever a new
     319              :         // range deletion is added to the batch. This cache can only be used when
     320              :         // opening an iterator to read at a batch sequence number >=
     321              :         // tombstonesSeqNum. This is the case for all new iterators created over a
     322              :         // batch but it's not the case for all cloned iterators.
     323              :         tombstones       []keyspan.Span
     324              :         tombstonesSeqNum base.SeqNum
     325              : 
     326              :         // Fragmented range key spans. Cached the first time a range key iterator is
     327              :         // requested. The cache is invalidated whenever a new range key
     328              :         // (RangeKey{Set,Unset,Del}) is added to the batch. This cache can only be
     329              :         // used when opening an iterator to read at a batch sequence number >=
     330              :         // tombstonesSeqNum. This is the case for all new iterators created over a
     331              :         // batch but it's not the case for all cloned iterators.
     332              :         rangeKeys       []keyspan.Span
     333              :         rangeKeysSeqNum base.SeqNum
     334              : 
     335              :         // The flushableBatch wrapper if the batch is too large to fit in the
     336              :         // memtable.
     337              :         flushable *flushableBatch
     338              : 
     339              :         // minimumFormatMajorVersion indicates the format major version required in
     340              :         // order to commit this batch. If an operation requires a particular format
     341              :         // major version, it ratchets the batch's minimumFormatMajorVersion. When
     342              :         // the batch is committed, this is validated against the database's current
     343              :         // format major version.
     344              :         minimumFormatMajorVersion FormatMajorVersion
     345              : 
     346              :         // Synchronous Apply uses the commit WaitGroup for both publishing the
     347              :         // seqnum and waiting for the WAL fsync (if needed). Asynchronous
     348              :         // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit
     349              :         // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for
     350              :         // waiting for the WAL fsync.
     351              :         //
     352              :         // TODO(sumeer): if we find that ApplyNoSyncWait in conjunction with
     353              :         // SyncWait is causing higher memory usage because of the time duration
     354              :         // between when the sync is already done, and a goroutine calls SyncWait
     355              :         // (followed by Batch.Close), we could separate out {fsyncWait, commitErr}
     356              :         // into a separate struct that is allocated separately (using another
     357              :         // sync.Pool), and only that struct needs to outlive Batch.Close (which
     358              :         // could then be called immediately after ApplyNoSyncWait). commitStats
     359              :         // will also need to be in this separate struct.
     360              :         commit    sync.WaitGroup
     361              :         fsyncWait sync.WaitGroup
     362              : 
     363              :         commitStats BatchCommitStats
     364              : 
     365              :         commitErr error
     366              : 
     367              :         // Position bools together to reduce the sizeof the struct.
     368              : 
     369              :         // ingestedSSTBatch indicates that the batch contains one or more key kinds
     370              :         // of InternalKeyKindIngestSST. If the batch contains key kinds of IngestSST
     371              :         // then it will only contain key kinds of IngestSST.
     372              :         ingestedSSTBatch bool
     373              : 
     374              :         // committing is set to true when a batch begins to commit. It's used to
     375              :         // ensure the batch is not mutated concurrently. It is not an atomic
     376              :         // deliberately, so as to avoid the overhead on batch mutations. This is
     377              :         // okay, because under correct usage this field will never be accessed
     378              :         // concurrently. It's only under incorrect usage the memory accesses of this
     379              :         // variable may violate memory safety. Since we don't use atomics here,
     380              :         // false negatives are possible.
     381              :         committing bool
     382              : }
     383              : 
     384              : // BatchCommitStats exposes stats related to committing a batch.
     385              : //
     386              : // NB: there is no Pebble internal tracing (using LoggerAndTracer) of slow
     387              : // batch commits. The caller can use these stats to do their own tracing as
     388              : // needed.
     389              : type BatchCommitStats struct {
     390              :         // TotalDuration is the time spent in DB.{Apply,ApplyNoSyncWait} or
     391              :         // Batch.Commit, plus the time waiting in Batch.SyncWait. If there is a gap
     392              :         // between calling ApplyNoSyncWait and calling SyncWait, that gap could
     393              :         // include some duration in which real work was being done for the commit
     394              :         // and will not be included here. This missing time is considered acceptable
     395              :         // since the goal of these stats is to understand user-facing latency.
     396              :         //
     397              :         // TotalDuration includes time spent in various queues both inside Pebble
     398              :         // and outside Pebble (I/O queues, goroutine scheduler queue, mutex wait
     399              :         // etc.). For some of these queues (which we consider important) the wait
     400              :         // times are included below -- these expose low-level implementation detail
     401              :         // and are meant for expert diagnosis and subject to change. There may be
     402              :         // unaccounted time after subtracting those values from TotalDuration.
     403              :         TotalDuration time.Duration
     404              :         // SemaphoreWaitDuration is the wait time for semaphores in
     405              :         // commitPipeline.Commit.
     406              :         SemaphoreWaitDuration time.Duration
     407              :         // WALQueueWaitDuration is the wait time for allocating memory blocks in the
     408              :         // LogWriter (due to the LogWriter not writing fast enough). At the moment
     409              :         // this is duration is always zero because a single WAL will allow
     410              :         // allocating memory blocks up to the entire memtable size. In the future,
     411              :         // we may pipeline WALs and bound the WAL queued blocks separately, so this
     412              :         // field is preserved for that possibility.
     413              :         WALQueueWaitDuration time.Duration
     414              :         // MemTableWriteStallDuration is the wait caused by a write stall due to too
     415              :         // many memtables (due to not flushing fast enough).
     416              :         MemTableWriteStallDuration time.Duration
     417              :         // L0ReadAmpWriteStallDuration is the wait caused by a write stall due to
     418              :         // high read amplification in L0 (due to not compacting fast enough out of
     419              :         // L0).
     420              :         L0ReadAmpWriteStallDuration time.Duration
     421              :         // WALRotationDuration is the wait time for WAL rotation, which includes
     422              :         // syncing and closing the old WAL and creating (or reusing) a new one.
     423              :         WALRotationDuration time.Duration
     424              :         // CommitWaitDuration is the wait for publishing the seqnum plus the
     425              :         // duration for the WAL sync (if requested). The former should be tiny and
     426              :         // one can assume that this is all due to the WAL sync.
     427              :         CommitWaitDuration time.Duration
     428              : }
     429              : 
     430              : var _ Reader = (*Batch)(nil)
     431              : var _ Writer = (*Batch)(nil)
     432              : 
     433              : var batchPool = sync.Pool{
     434            1 :         New: func() interface{} {
     435            1 :                 return &Batch{}
     436            1 :         },
     437              : }
     438              : 
     439              : type indexedBatch struct {
     440              :         batch Batch
     441              :         index batchskl.Skiplist
     442              : }
     443              : 
     444              : var indexedBatchPool = sync.Pool{
     445            1 :         New: func() interface{} {
     446            1 :                 return &indexedBatch{}
     447            1 :         },
     448              : }
     449              : 
     450            1 : func newBatch(db *DB, opts ...BatchOption) *Batch {
     451            1 :         b := batchPool.Get().(*Batch)
     452            1 :         b.db = db
     453            1 :         b.opts.ensureDefaults()
     454            1 :         for _, opt := range opts {
     455            0 :                 opt(&b.opts)
     456            0 :         }
     457            1 :         return b
     458              : }
     459              : 
     460            0 : func newBatchWithSize(db *DB, size int, opts ...BatchOption) *Batch {
     461            0 :         b := newBatch(db, opts...)
     462            0 :         if cap(b.data) < size {
     463            0 :                 b.data = rawalloc.New(0, size)
     464            0 :         }
     465            0 :         return b
     466              : }
     467              : 
     468            1 : func newIndexedBatch(db *DB, comparer *Comparer) *Batch {
     469            1 :         i := indexedBatchPool.Get().(*indexedBatch)
     470            1 :         i.batch.comparer = comparer
     471            1 :         i.batch.db = db
     472            1 :         i.batch.index = &i.index
     473            1 :         i.batch.index.Init(&i.batch.data, comparer.Compare, comparer.AbbreviatedKey)
     474            1 :         i.batch.opts.ensureDefaults()
     475            1 :         return &i.batch
     476            1 : }
     477              : 
     478            0 : func newIndexedBatchWithSize(db *DB, comparer *Comparer, size int) *Batch {
     479            0 :         b := newIndexedBatch(db, comparer)
     480            0 :         if cap(b.data) < size {
     481            0 :                 b.data = rawalloc.New(0, size)
     482            0 :         }
     483            0 :         return b
     484              : }
     485              : 
     486              : // nextSeqNum returns the batch "sequence number" that will be given to the next
     487              : // key written to the batch. During iteration keys within an indexed batch are
     488              : // given a sequence number consisting of their offset within the batch combined
     489              : // with the base.SeqNumBatchBit bit. These sequence numbers are only
     490              : // used during iteration, and the keys are assigned ordinary sequence numbers
     491              : // when the batch is committed.
     492            1 : func (b *Batch) nextSeqNum() base.SeqNum {
     493            1 :         return base.SeqNum(len(b.data)) | base.SeqNumBatchBit
     494            1 : }
     495              : 
     496            1 : func (b *Batch) release() {
     497            1 :         if b.db == nil {
     498            1 :                 // The batch was not created using newBatch or newIndexedBatch, or an error
     499            1 :                 // was encountered. We don't try to reuse batches that encountered an error
     500            1 :                 // because they might be stuck somewhere in the system and attempting to
     501            1 :                 // reuse such batches is a recipe for onerous debugging sessions. Instead,
     502            1 :                 // let the GC do its job.
     503            1 :                 return
     504            1 :         }
     505            1 :         b.db = nil
     506            1 : 
     507            1 :         // NB: This is ugly (it would be cleaner if we could just assign a Batch{}),
     508            1 :         // but necessary so that we can use atomic.StoreUint32 for the Batch.applied
     509            1 :         // field. Without using an atomic to clear that field the Go race detector
     510            1 :         // complains.
     511            1 :         b.reset()
     512            1 :         b.comparer = nil
     513            1 : 
     514            1 :         if b.index == nil {
     515            1 :                 batchPool.Put(b)
     516            1 :         } else {
     517            1 :                 b.index, b.rangeDelIndex, b.rangeKeyIndex = nil, nil, nil
     518            1 :                 indexedBatchPool.Put((*indexedBatch)(unsafe.Pointer(b)))
     519            1 :         }
     520              : }
     521              : 
     522            1 : func (b *Batch) refreshMemTableSize() error {
     523            1 :         b.memTableSize = 0
     524            1 :         if len(b.data) < batchrepr.HeaderLen {
     525            0 :                 return nil
     526            0 :         }
     527              : 
     528            1 :         b.countRangeDels = 0
     529            1 :         b.countRangeKeys = 0
     530            1 :         b.minimumFormatMajorVersion = 0
     531            1 :         for r := b.Reader(); ; {
     532            1 :                 kind, key, value, ok, err := r.Next()
     533            1 :                 if !ok {
     534            1 :                         if err != nil {
     535            0 :                                 return err
     536            0 :                         }
     537            1 :                         break
     538              :                 }
     539            1 :                 switch kind {
     540            1 :                 case InternalKeyKindRangeDelete:
     541            1 :                         b.countRangeDels++
     542            1 :                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     543            1 :                         b.countRangeKeys++
     544            1 :                 case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge, InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete:
     545              :                         // fallthrough
     546            1 :                 case InternalKeyKindDeleteSized:
     547            1 :                         if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
     548            1 :                                 b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
     549            1 :                         }
     550            0 :                 case InternalKeyKindLogData:
     551            0 :                         // LogData does not contribute to memtable size.
     552            0 :                         continue
     553            1 :                 case InternalKeyKindIngestSST:
     554            1 :                         if b.minimumFormatMajorVersion < FormatFlushableIngest {
     555            1 :                                 b.minimumFormatMajorVersion = FormatFlushableIngest
     556            1 :                         }
     557              :                         // This key kind doesn't contribute to the memtable size.
     558            1 :                         continue
     559            1 :                 case InternalKeyKindExcise:
     560            1 :                         if b.minimumFormatMajorVersion < FormatFlushableIngestExcises {
     561            1 :                                 b.minimumFormatMajorVersion = FormatFlushableIngestExcises
     562            1 :                         }
     563              :                         // This key kind doesn't contribute to the memtable size.
     564            1 :                         continue
     565            0 :                 default:
     566            0 :                         // Note In some circumstances this might be temporary memory
     567            0 :                         // corruption that can be recovered by discarding the batch and
     568            0 :                         // trying again. In other cases, the batch repr might've been
     569            0 :                         // already persisted elsewhere, and we'll loop continuously trying
     570            0 :                         // to commit the same corrupted batch. The caller is responsible for
     571            0 :                         // distinguishing.
     572            0 :                         return errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
     573              :                 }
     574            1 :                 b.memTableSize += memTableEntrySize(len(key), len(value))
     575              :         }
     576            1 :         return nil
     577              : }
     578              : 
     579              : // Apply the operations contained in the batch to the receiver batch.
     580              : //
     581              : // It is safe to modify the contents of the arguments after Apply returns.
     582              : //
     583              : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
     584            1 : func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
     585            1 :         if b.ingestedSSTBatch {
     586            0 :                 panic("pebble: invalid batch application")
     587              :         }
     588            1 :         if len(batch.data) == 0 {
     589            1 :                 return nil
     590            1 :         }
     591            1 :         if len(batch.data) < batchrepr.HeaderLen {
     592            0 :                 return ErrInvalidBatch
     593            0 :         }
     594              : 
     595            1 :         offset := len(b.data)
     596            1 :         if offset == 0 {
     597            0 :                 b.init(offset)
     598            0 :                 offset = batchrepr.HeaderLen
     599            0 :         }
     600            1 :         b.data = append(b.data, batch.data[batchrepr.HeaderLen:]...)
     601            1 : 
     602            1 :         b.setCount(b.Count() + batch.Count())
     603            1 : 
     604            1 :         if b.db != nil || b.index != nil {
     605            1 :                 // Only iterate over the new entries if we need to track memTableSize or in
     606            1 :                 // order to update the index.
     607            1 :                 for iter := batchrepr.Reader(b.data[offset:]); len(iter) > 0; {
     608            1 :                         offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
     609            1 :                         kind, key, value, ok, err := iter.Next()
     610            1 :                         if !ok {
     611            0 :                                 if err != nil {
     612            0 :                                         return err
     613            0 :                                 }
     614            0 :                                 break
     615              :                         }
     616            1 :                         switch kind {
     617            0 :                         case InternalKeyKindRangeDelete:
     618            0 :                                 b.countRangeDels++
     619            0 :                         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     620            0 :                                 b.countRangeKeys++
     621            0 :                         case InternalKeyKindIngestSST, InternalKeyKindExcise:
     622            0 :                                 panic("pebble: invalid key kind for batch")
     623            0 :                         case InternalKeyKindLogData:
     624            0 :                                 // LogData does not contribute to memtable size.
     625            0 :                                 continue
     626              :                         case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge,
     627            1 :                                 InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
     628              :                                 // fallthrough
     629            0 :                         default:
     630            0 :                                 // Note In some circumstances this might be temporary memory
     631            0 :                                 // corruption that can be recovered by discarding the batch and
     632            0 :                                 // trying again. In other cases, the batch repr might've been
     633            0 :                                 // already persisted elsewhere, and we'll loop continuously
     634            0 :                                 // trying to commit the same corrupted batch. The caller is
     635            0 :                                 // responsible for distinguishing.
     636            0 :                                 return errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
     637              :                         }
     638            1 :                         if b.index != nil {
     639            1 :                                 var err error
     640            1 :                                 switch kind {
     641            0 :                                 case InternalKeyKindRangeDelete:
     642            0 :                                         b.tombstones = nil
     643            0 :                                         b.tombstonesSeqNum = 0
     644            0 :                                         if b.rangeDelIndex == nil {
     645            0 :                                                 b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
     646            0 :                                         }
     647            0 :                                         err = b.rangeDelIndex.Add(uint32(offset))
     648            0 :                                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     649            0 :                                         b.rangeKeys = nil
     650            0 :                                         b.rangeKeysSeqNum = 0
     651            0 :                                         if b.rangeKeyIndex == nil {
     652            0 :                                                 b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
     653            0 :                                         }
     654            0 :                                         err = b.rangeKeyIndex.Add(uint32(offset))
     655            1 :                                 default:
     656            1 :                                         err = b.index.Add(uint32(offset))
     657              :                                 }
     658            1 :                                 if err != nil {
     659            0 :                                         return err
     660            0 :                                 }
     661              :                         }
     662            1 :                         b.memTableSize += memTableEntrySize(len(key), len(value))
     663              :                 }
     664              :         }
     665            1 :         return nil
     666              : }
     667              : 
     668              : // Get gets the value for the given key. It returns ErrNotFound if the Batch
     669              : // does not contain the key.
     670              : //
     671              : // The caller should not modify the contents of the returned slice, but it is
     672              : // safe to modify the contents of the argument after Get returns. The returned
     673              : // slice will remain valid until the returned Closer is closed. On success, the
     674              : // caller MUST call closer.Close() or a memory leak will occur.
     675            1 : func (b *Batch) Get(key []byte) ([]byte, io.Closer, error) {
     676            1 :         if b.index == nil {
     677            0 :                 return nil, nil, ErrNotIndexed
     678            0 :         }
     679            1 :         return b.db.getInternal(key, b, nil /* snapshot */)
     680              : }
     681              : 
     682            1 : func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind InternalKeyKind) {
     683            1 :         if b.committing {
     684            0 :                 panic("pebble: batch already committing")
     685              :         }
     686            1 :         if len(b.data) == 0 {
     687            1 :                 b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchrepr.HeaderLen)
     688            1 :         }
     689            1 :         b.count++
     690            1 :         b.memTableSize += memTableEntrySize(keyLen, valueLen)
     691            1 : 
     692            1 :         pos := len(b.data)
     693            1 :         b.deferredOp.offset = uint32(pos)
     694            1 :         b.grow(1 + 2*maxVarintLen32 + keyLen + valueLen)
     695            1 :         b.data[pos] = byte(kind)
     696            1 :         pos++
     697            1 : 
     698            1 :         {
     699            1 :                 // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
     700            1 :                 // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
     701            1 :                 // versions show this to not be a performance win.
     702            1 :                 x := uint32(keyLen)
     703            1 :                 for x >= 0x80 {
     704            0 :                         b.data[pos] = byte(x) | 0x80
     705            0 :                         x >>= 7
     706            0 :                         pos++
     707            0 :                 }
     708            1 :                 b.data[pos] = byte(x)
     709            1 :                 pos++
     710              :         }
     711              : 
     712            1 :         b.deferredOp.Key = b.data[pos : pos+keyLen]
     713            1 :         pos += keyLen
     714            1 : 
     715            1 :         {
     716            1 :                 // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
     717            1 :                 // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
     718            1 :                 // versions show this to not be a performance win.
     719            1 :                 x := uint32(valueLen)
     720            1 :                 for x >= 0x80 {
     721            0 :                         b.data[pos] = byte(x) | 0x80
     722            0 :                         x >>= 7
     723            0 :                         pos++
     724            0 :                 }
     725            1 :                 b.data[pos] = byte(x)
     726            1 :                 pos++
     727              :         }
     728              : 
     729            1 :         b.deferredOp.Value = b.data[pos : pos+valueLen]
     730            1 :         // Shrink data since varints may be shorter than the upper bound.
     731            1 :         b.data = b.data[:pos+valueLen]
     732              : }
     733              : 
     734            1 : func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) {
     735            1 :         if b.committing {
     736            0 :                 panic("pebble: batch already committing")
     737              :         }
     738            1 :         if len(b.data) == 0 {
     739            1 :                 b.init(keyLen + binary.MaxVarintLen64 + batchrepr.HeaderLen)
     740            1 :         }
     741            1 :         b.count++
     742            1 :         b.memTableSize += memTableEntrySize(keyLen, 0)
     743            1 : 
     744            1 :         pos := len(b.data)
     745            1 :         b.deferredOp.offset = uint32(pos)
     746            1 :         b.grow(1 + maxVarintLen32 + keyLen)
     747            1 :         b.data[pos] = byte(kind)
     748            1 :         pos++
     749            1 : 
     750            1 :         {
     751            1 :                 // TODO(peter): Manually inlined version binary.PutUvarint(). Remove if
     752            1 :                 // go1.13 or future versions show this to not be a performance win. See
     753            1 :                 // BenchmarkBatchSet.
     754            1 :                 x := uint32(keyLen)
     755            1 :                 for x >= 0x80 {
     756            0 :                         b.data[pos] = byte(x) | 0x80
     757            0 :                         x >>= 7
     758            0 :                         pos++
     759            0 :                 }
     760            1 :                 b.data[pos] = byte(x)
     761            1 :                 pos++
     762              :         }
     763              : 
     764            1 :         b.deferredOp.Key = b.data[pos : pos+keyLen]
     765            1 :         b.deferredOp.Value = nil
     766            1 : 
     767            1 :         // Shrink data since varint may be shorter than the upper bound.
     768            1 :         b.data = b.data[:pos+keyLen]
     769              : }
     770              : 
     771              : // AddInternalKey allows the caller to add an internal key of point key or range
     772              : // key kinds (but not RangeDelete) to a batch. Passing in an internal key of
     773              : // kind RangeDelete will result in a panic. Note that the seqnum in the internal
     774              : // key is effectively ignored, even though the Kind is preserved. This is
     775              : // because the batch format does not allow for a per-key seqnum to be specified,
     776              : // only a batch-wide one.
     777              : //
     778              : // Note that non-indexed keys (IngestKeyKind{LogData,IngestSST}) are not
     779              : // supported with this method as they require specialized logic.
     780            1 : func (b *Batch) AddInternalKey(key *base.InternalKey, value []byte, _ *WriteOptions) error {
     781            1 :         keyLen := len(key.UserKey)
     782            1 :         hasValue := false
     783            1 :         switch kind := key.Kind(); kind {
     784            0 :         case InternalKeyKindRangeDelete:
     785            0 :                 panic("unexpected range delete in AddInternalKey")
     786            0 :         case InternalKeyKindSingleDelete, InternalKeyKindDelete:
     787            0 :                 b.prepareDeferredKeyRecord(keyLen, kind)
     788            0 :                 b.deferredOp.index = b.index
     789            1 :         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     790            1 :                 b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
     791            1 :                 hasValue = true
     792            1 :                 b.incrementRangeKeysCount()
     793            0 :         default:
     794            0 :                 b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
     795            0 :                 hasValue = true
     796            0 :                 b.deferredOp.index = b.index
     797              :         }
     798            1 :         copy(b.deferredOp.Key, key.UserKey)
     799            1 :         if hasValue {
     800            1 :                 copy(b.deferredOp.Value, value)
     801            1 :         }
     802              : 
     803              :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     804              :         // in go1.13 will remove the need for this.
     805            1 :         if b.index != nil {
     806            0 :                 if err := b.index.Add(b.deferredOp.offset); err != nil {
     807            0 :                         return err
     808            0 :                 }
     809              :         }
     810            1 :         return nil
     811              : }
     812              : 
     813              : // Set adds an action to the batch that sets the key to map to the value.
     814              : //
     815              : // It is safe to modify the contents of the arguments after Set returns.
     816            1 : func (b *Batch) Set(key, value []byte, _ *WriteOptions) error {
     817            1 :         deferredOp := b.SetDeferred(len(key), len(value))
     818            1 :         copy(deferredOp.Key, key)
     819            1 :         copy(deferredOp.Value, value)
     820            1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     821            1 :         // in go1.13 will remove the need for this.
     822            1 :         if b.index != nil {
     823            1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     824            0 :                         return err
     825            0 :                 }
     826              :         }
     827            1 :         return nil
     828              : }
     829              : 
     830              : // SetDeferred is similar to Set in that it adds a set operation to the batch,
     831              : // except it only takes in key/value lengths instead of complete slices,
     832              : // letting the caller encode into those objects and then call Finish() on the
     833              : // returned object.
     834            1 : func (b *Batch) SetDeferred(keyLen, valueLen int) *DeferredBatchOp {
     835            1 :         b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindSet)
     836            1 :         b.deferredOp.index = b.index
     837            1 :         return &b.deferredOp
     838            1 : }
     839              : 
     840              : // Merge adds an action to the batch that merges the value at key with the new
     841              : // value. The details of the merge are dependent upon the configured merge
     842              : // operator.
     843              : //
     844              : // It is safe to modify the contents of the arguments after Merge returns.
     845            1 : func (b *Batch) Merge(key, value []byte, _ *WriteOptions) error {
     846            1 :         deferredOp := b.MergeDeferred(len(key), len(value))
     847            1 :         copy(deferredOp.Key, key)
     848            1 :         copy(deferredOp.Value, value)
     849            1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     850            1 :         // in go1.13 will remove the need for this.
     851            1 :         if b.index != nil {
     852            1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     853            0 :                         return err
     854            0 :                 }
     855              :         }
     856            1 :         return nil
     857              : }
     858              : 
     859              : // MergeDeferred is similar to Merge in that it adds a merge operation to the
     860              : // batch, except it only takes in key/value lengths instead of complete slices,
     861              : // letting the caller encode into those objects and then call Finish() on the
     862              : // returned object.
     863            1 : func (b *Batch) MergeDeferred(keyLen, valueLen int) *DeferredBatchOp {
     864            1 :         b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindMerge)
     865            1 :         b.deferredOp.index = b.index
     866            1 :         return &b.deferredOp
     867            1 : }
     868              : 
     869              : // Delete adds an action to the batch that deletes the entry for key.
     870              : //
     871              : // It is safe to modify the contents of the arguments after Delete returns.
     872            1 : func (b *Batch) Delete(key []byte, _ *WriteOptions) error {
     873            1 :         deferredOp := b.DeleteDeferred(len(key))
     874            1 :         copy(deferredOp.Key, key)
     875            1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     876            1 :         // in go1.13 will remove the need for this.
     877            1 :         if b.index != nil {
     878            1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     879            0 :                         return err
     880            0 :                 }
     881              :         }
     882            1 :         return nil
     883              : }
     884              : 
     885              : // DeleteDeferred is similar to Delete in that it adds a delete operation to
     886              : // the batch, except it only takes in key/value lengths instead of complete
     887              : // slices, letting the caller encode into those objects and then call Finish()
     888              : // on the returned object.
     889            1 : func (b *Batch) DeleteDeferred(keyLen int) *DeferredBatchOp {
     890            1 :         b.prepareDeferredKeyRecord(keyLen, InternalKeyKindDelete)
     891            1 :         b.deferredOp.index = b.index
     892            1 :         return &b.deferredOp
     893            1 : }
     894              : 
     895              : // DeleteSized behaves identically to Delete, but takes an additional
     896              : // argument indicating the size of the value being deleted. DeleteSized
     897              : // should be preferred when the caller has the expectation that there exists
     898              : // a single internal KV pair for the key (eg, the key has not been
     899              : // overwritten recently), and the caller knows the size of its value.
     900              : //
     901              : // DeleteSized will record the value size within the tombstone and use it to
     902              : // inform compaction-picking heuristics which strive to reduce space
     903              : // amplification in the LSM. This "calling your shot" mechanic allows the
     904              : // storage engine to more accurately estimate and reduce space amplification.
     905              : //
     906              : // It is safe to modify the contents of the arguments after DeleteSized
     907              : // returns.
     908            1 : func (b *Batch) DeleteSized(key []byte, deletedValueSize uint32, _ *WriteOptions) error {
     909            1 :         deferredOp := b.DeleteSizedDeferred(len(key), deletedValueSize)
     910            1 :         copy(b.deferredOp.Key, key)
     911            1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Check if in a
     912            1 :         // later Go release this is unnecessary.
     913            1 :         if b.index != nil {
     914            1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     915            0 :                         return err
     916            0 :                 }
     917              :         }
     918            1 :         return nil
     919              : }
     920              : 
     921              : // DeleteSizedDeferred is similar to DeleteSized in that it adds a sized delete
     922              : // operation to the batch, except it only takes in key length instead of a
     923              : // complete key slice, letting the caller encode into the DeferredBatchOp.Key
     924              : // slice and then call Finish() on the returned object.
     925            1 : func (b *Batch) DeleteSizedDeferred(keyLen int, deletedValueSize uint32) *DeferredBatchOp {
     926            1 :         if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
     927            1 :                 b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
     928            1 :         }
     929              : 
     930              :         // Encode the sum of the key length and the value in the value.
     931            1 :         v := uint64(deletedValueSize) + uint64(keyLen)
     932            1 : 
     933            1 :         // Encode `v` as a varint.
     934            1 :         var buf [binary.MaxVarintLen64]byte
     935            1 :         n := 0
     936            1 :         {
     937            1 :                 x := v
     938            1 :                 for x >= 0x80 {
     939            0 :                         buf[n] = byte(x) | 0x80
     940            0 :                         x >>= 7
     941            0 :                         n++
     942            0 :                 }
     943            1 :                 buf[n] = byte(x)
     944            1 :                 n++
     945              :         }
     946              : 
     947              :         // NB: In batch entries and sstable entries, values are stored as
     948              :         // varstrings. Here, the value is itself a simple varint. This results in an
     949              :         // unnecessary double layer of encoding:
     950              :         //     varint(n) varint(deletedValueSize)
     951              :         // The first varint will always be 1-byte, since a varint-encoded uint64
     952              :         // will never exceed 128 bytes. This unnecessary extra byte and wrapping is
     953              :         // preserved to avoid special casing across the database, and in particular
     954              :         // in sstable block decoding which is performance sensitive.
     955            1 :         b.prepareDeferredKeyValueRecord(keyLen, n, InternalKeyKindDeleteSized)
     956            1 :         b.deferredOp.index = b.index
     957            1 :         copy(b.deferredOp.Value, buf[:n])
     958            1 :         return &b.deferredOp
     959              : }
     960              : 
     961              : // SingleDelete adds an action to the batch that single deletes the entry for key.
     962              : // WARNING: See the detailed warning in Writer.SingleDelete before using this.
     963              : //
     964              : // It is safe to modify the contents of the arguments after SingleDelete returns.
     965            1 : func (b *Batch) SingleDelete(key []byte, _ *WriteOptions) error {
     966            1 :         deferredOp := b.SingleDeleteDeferred(len(key))
     967            1 :         copy(deferredOp.Key, key)
     968            1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     969            1 :         // in go1.13 will remove the need for this.
     970            1 :         if b.index != nil {
     971            1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     972            0 :                         return err
     973            0 :                 }
     974              :         }
     975            1 :         return nil
     976              : }
     977              : 
     978              : // SingleDeleteDeferred is similar to SingleDelete in that it adds a single delete
     979              : // operation to the batch, except it only takes in key/value lengths instead of
     980              : // complete slices, letting the caller encode into those objects and then call
     981              : // Finish() on the returned object.
     982              : //
     983              : // WARNING: See the detailed warning in Writer.SingleDelete before using this.
     984            1 : func (b *Batch) SingleDeleteDeferred(keyLen int) *DeferredBatchOp {
     985            1 :         b.prepareDeferredKeyRecord(keyLen, InternalKeyKindSingleDelete)
     986            1 :         b.deferredOp.index = b.index
     987            1 :         return &b.deferredOp
     988            1 : }
     989              : 
     990              : // DeleteRange deletes all of the point keys (and values) in the range
     991              : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
     992              : // delete overlapping range keys (eg, keys set via RangeKeySet).
     993              : //
     994              : // It is safe to modify the contents of the arguments after DeleteRange
     995              : // returns.
     996            1 : func (b *Batch) DeleteRange(start, end []byte, _ *WriteOptions) error {
     997            1 :         deferredOp := b.DeleteRangeDeferred(len(start), len(end))
     998            1 :         copy(deferredOp.Key, start)
     999            1 :         copy(deferredOp.Value, end)
    1000            1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
    1001            1 :         // in go1.13 will remove the need for this.
    1002            1 :         if deferredOp.index != nil {
    1003            1 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
    1004            0 :                         return err
    1005            0 :                 }
    1006              :         }
    1007            1 :         return nil
    1008              : }
    1009              : 
    1010              : // DeleteRangeDeferred is similar to DeleteRange in that it adds a delete range
    1011              : // operation to the batch, except it only takes in key lengths instead of
    1012              : // complete slices, letting the caller encode into those objects and then call
    1013              : // Finish() on the returned object. Note that DeferredBatchOp.Key should be
    1014              : // populated with the start key, and DeferredBatchOp.Value should be populated
    1015              : // with the end key.
    1016            1 : func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp {
    1017            1 :         b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeDelete)
    1018            1 :         b.countRangeDels++
    1019            1 :         if b.index != nil {
    1020            1 :                 b.tombstones = nil
    1021            1 :                 b.tombstonesSeqNum = 0
    1022            1 :                 // Range deletions are rare, so we lazily allocate the index for them.
    1023            1 :                 if b.rangeDelIndex == nil {
    1024            1 :                         b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
    1025            1 :                 }
    1026            1 :                 b.deferredOp.index = b.rangeDelIndex
    1027              :         }
    1028            1 :         return &b.deferredOp
    1029              : }
    1030              : 
    1031              : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
    1032              : // timestamp suffix to value. The suffix is optional. If any portion of the key
    1033              : // range [start, end) is already set by a range key with the same suffix value,
    1034              : // RangeKeySet overrides it.
    1035              : //
    1036              : // It is safe to modify the contents of the arguments after RangeKeySet returns.
    1037            1 : func (b *Batch) RangeKeySet(start, end, suffix, value []byte, _ *WriteOptions) error {
    1038            1 :         if invariants.Enabled && b.db != nil {
    1039            1 :                 // RangeKeySet is only supported on prefix keys.
    1040            1 :                 if b.db.opts.Comparer.Split(start) != len(start) {
    1041            0 :                         panic("RangeKeySet called with suffixed start key")
    1042              :                 }
    1043            1 :                 if b.db.opts.Comparer.Split(end) != len(end) {
    1044            0 :                         panic("RangeKeySet called with suffixed end key")
    1045              :                 }
    1046              :         }
    1047            1 :         suffixValues := [1]rangekey.SuffixValue{{Suffix: suffix, Value: value}}
    1048            1 :         internalValueLen := rangekey.EncodedSetValueLen(end, suffixValues[:])
    1049            1 : 
    1050            1 :         deferredOp := b.rangeKeySetDeferred(len(start), internalValueLen)
    1051            1 :         copy(deferredOp.Key, start)
    1052            1 :         n := rangekey.EncodeSetValue(deferredOp.Value, end, suffixValues[:])
    1053            1 :         if n != internalValueLen {
    1054            0 :                 panic("unexpected internal value length mismatch")
    1055              :         }
    1056              : 
    1057              :         // Manually inline DeferredBatchOp.Finish().
    1058            1 :         if deferredOp.index != nil {
    1059            1 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
    1060            0 :                         return err
    1061            0 :                 }
    1062              :         }
    1063            1 :         return nil
    1064              : }
    1065              : 
    1066            1 : func (b *Batch) rangeKeySetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
    1067            1 :         b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeySet)
    1068            1 :         b.incrementRangeKeysCount()
    1069            1 :         return &b.deferredOp
    1070            1 : }
    1071              : 
    1072            1 : func (b *Batch) incrementRangeKeysCount() {
    1073            1 :         b.countRangeKeys++
    1074            1 :         if b.index != nil {
    1075            1 :                 b.rangeKeys = nil
    1076            1 :                 b.rangeKeysSeqNum = 0
    1077            1 :                 // Range keys are rare, so we lazily allocate the index for them.
    1078            1 :                 if b.rangeKeyIndex == nil {
    1079            1 :                         b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
    1080            1 :                 }
    1081            1 :                 b.deferredOp.index = b.rangeKeyIndex
    1082              :         }
    1083              : }
    1084              : 
    1085              : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
    1086              : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
    1087              : // range key. RangeKeyUnset only removes portions of range keys that fall within
    1088              : // the [start, end) key span, and only range keys with suffixes that exactly
    1089              : // match the unset suffix.
    1090              : //
    1091              : // It is safe to modify the contents of the arguments after RangeKeyUnset
    1092              : // returns.
    1093            1 : func (b *Batch) RangeKeyUnset(start, end, suffix []byte, _ *WriteOptions) error {
    1094            1 :         if invariants.Enabled && b.db != nil {
    1095            1 :                 // RangeKeyUnset is only supported on prefix keys.
    1096            1 :                 if b.db.opts.Comparer.Split(start) != len(start) {
    1097            0 :                         panic("RangeKeyUnset called with suffixed start key")
    1098              :                 }
    1099            1 :                 if b.db.opts.Comparer.Split(end) != len(end) {
    1100            0 :                         panic("RangeKeyUnset called with suffixed end key")
    1101              :                 }
    1102              :         }
    1103            1 :         suffixes := [1][]byte{suffix}
    1104            1 :         internalValueLen := rangekey.EncodedUnsetValueLen(end, suffixes[:])
    1105            1 : 
    1106            1 :         deferredOp := b.rangeKeyUnsetDeferred(len(start), internalValueLen)
    1107            1 :         copy(deferredOp.Key, start)
    1108            1 :         n := rangekey.EncodeUnsetValue(deferredOp.Value, end, suffixes[:])
    1109            1 :         if n != internalValueLen {
    1110            0 :                 panic("unexpected internal value length mismatch")
    1111              :         }
    1112              : 
    1113              :         // Manually inline DeferredBatchOp.Finish()
    1114            1 :         if deferredOp.index != nil {
    1115            1 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
    1116            0 :                         return err
    1117            0 :                 }
    1118              :         }
    1119            1 :         return nil
    1120              : }
    1121              : 
    1122            1 : func (b *Batch) rangeKeyUnsetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
    1123            1 :         b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeyUnset)
    1124            1 :         b.incrementRangeKeysCount()
    1125            1 :         return &b.deferredOp
    1126            1 : }
    1127              : 
    1128              : // RangeKeyDelete deletes all of the range keys in the range [start,end)
    1129              : // (inclusive on start, exclusive on end). It does not delete point keys (for
    1130              : // that use DeleteRange). RangeKeyDelete removes all range keys within the
    1131              : // bounds, including those with or without suffixes.
    1132              : //
    1133              : // It is safe to modify the contents of the arguments after RangeKeyDelete
    1134              : // returns.
    1135            1 : func (b *Batch) RangeKeyDelete(start, end []byte, _ *WriteOptions) error {
    1136            1 :         if invariants.Enabled && b.db != nil {
    1137            1 :                 // RangeKeyDelete is only supported on prefix keys.
    1138            1 :                 if b.db.opts.Comparer.Split(start) != len(start) {
    1139            0 :                         panic("RangeKeyDelete called with suffixed start key")
    1140              :                 }
    1141            1 :                 if b.db.opts.Comparer.Split(end) != len(end) {
    1142            0 :                         panic("RangeKeyDelete called with suffixed end key")
    1143              :                 }
    1144              :         }
    1145            1 :         deferredOp := b.RangeKeyDeleteDeferred(len(start), len(end))
    1146            1 :         copy(deferredOp.Key, start)
    1147            1 :         copy(deferredOp.Value, end)
    1148            1 :         // Manually inline DeferredBatchOp.Finish().
    1149            1 :         if deferredOp.index != nil {
    1150            1 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
    1151            0 :                         return err
    1152            0 :                 }
    1153              :         }
    1154            1 :         return nil
    1155              : }
    1156              : 
    1157              : // RangeKeyDeleteDeferred is similar to RangeKeyDelete in that it adds an
    1158              : // operation to delete range keys to the batch, except it only takes in key
    1159              : // lengths instead of complete slices, letting the caller encode into those
    1160              : // objects and then call Finish() on the returned object. Note that
    1161              : // DeferredBatchOp.Key should be populated with the start key, and
    1162              : // DeferredBatchOp.Value should be populated with the end key.
    1163            1 : func (b *Batch) RangeKeyDeleteDeferred(startLen, endLen int) *DeferredBatchOp {
    1164            1 :         b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeKeyDelete)
    1165            1 :         b.incrementRangeKeysCount()
    1166            1 :         return &b.deferredOp
    1167            1 : }
    1168              : 
    1169              : // LogData adds the specified to the batch. The data will be written to the
    1170              : // WAL, but not added to memtables or sstables. Log data is never indexed,
    1171              : // which makes it useful for testing WAL performance.
    1172              : //
    1173              : // It is safe to modify the contents of the argument after LogData returns.
    1174            1 : func (b *Batch) LogData(data []byte, _ *WriteOptions) error {
    1175            1 :         origCount, origMemTableSize := b.count, b.memTableSize
    1176            1 :         b.prepareDeferredKeyRecord(len(data), InternalKeyKindLogData)
    1177            1 :         copy(b.deferredOp.Key, data)
    1178            1 :         // Since LogData only writes to the WAL and does not affect the memtable, we
    1179            1 :         // restore b.count and b.memTableSize to their origin values. Note that
    1180            1 :         // Batch.count only refers to records that are added to the memtable.
    1181            1 :         b.count, b.memTableSize = origCount, origMemTableSize
    1182            1 :         return nil
    1183            1 : }
    1184              : 
    1185              : // IngestSST adds the TableNum for an sstable to the batch. The data will only be
    1186              : // written to the WAL (not added to memtables or sstables).
    1187            1 : func (b *Batch) ingestSST(tableNum base.TableNum) {
    1188            1 :         if b.Empty() {
    1189            1 :                 b.ingestedSSTBatch = true
    1190            1 :         } else if !b.ingestedSSTBatch {
    1191            0 :                 // Batch contains other key kinds.
    1192            0 :                 panic("pebble: invalid call to ingestSST")
    1193              :         }
    1194              : 
    1195            1 :         origMemTableSize := b.memTableSize
    1196            1 :         var buf [binary.MaxVarintLen64]byte
    1197            1 :         length := binary.PutUvarint(buf[:], uint64(tableNum))
    1198            1 :         b.prepareDeferredKeyRecord(length, InternalKeyKindIngestSST)
    1199            1 :         copy(b.deferredOp.Key, buf[:length])
    1200            1 :         // Since IngestSST writes only to the WAL and does not affect the memtable,
    1201            1 :         // we restore b.memTableSize to its original value. Note that Batch.count
    1202            1 :         // is not reset because for the InternalKeyKindIngestSST the count is the
    1203            1 :         // number of sstable paths which have been added to the batch.
    1204            1 :         b.memTableSize = origMemTableSize
    1205            1 :         b.minimumFormatMajorVersion = FormatFlushableIngest
    1206              : }
    1207              : 
    1208              : // Excise adds the excise span for a flushable ingest containing an excise. The data
    1209              : // will only be written to the WAL (not added to memtables or sstables).
    1210            1 : func (b *Batch) excise(start, end []byte) {
    1211            1 :         if b.Empty() {
    1212            1 :                 b.ingestedSSTBatch = true
    1213            1 :         } else if !b.ingestedSSTBatch {
    1214            0 :                 // Batch contains other key kinds.
    1215            0 :                 panic("pebble: invalid call to excise")
    1216              :         }
    1217              : 
    1218            1 :         origMemTableSize := b.memTableSize
    1219            1 :         b.prepareDeferredKeyValueRecord(len(start), len(end), InternalKeyKindExcise)
    1220            1 :         copy(b.deferredOp.Key, start)
    1221            1 :         copy(b.deferredOp.Value, end)
    1222            1 :         // Since excise writes only to the WAL and does not affect the memtable,
    1223            1 :         // we restore b.memTableSize to its original value. Note that Batch.count
    1224            1 :         // is not reset because for the InternalKeyKindIngestSST/Excise the count
    1225            1 :         // is the number of sstable paths which have been added to the batch.
    1226            1 :         b.memTableSize = origMemTableSize
    1227            1 :         b.minimumFormatMajorVersion = FormatFlushableIngestExcises
    1228              : }
    1229              : 
    1230              : // Empty returns true if the batch is empty, and false otherwise.
    1231            1 : func (b *Batch) Empty() bool {
    1232            1 :         return batchrepr.IsEmpty(b.data)
    1233            1 : }
    1234              : 
    1235              : // Len returns the current size of the batch in bytes.
    1236            0 : func (b *Batch) Len() int {
    1237            0 :         return max(batchrepr.HeaderLen, len(b.data))
    1238            0 : }
    1239              : 
    1240              : // Repr returns the underlying batch representation. It is not safe to modify
    1241              : // the contents. Reset() will not change the contents of the returned value,
    1242              : // though any other mutation operation may do so.
    1243            1 : func (b *Batch) Repr() []byte {
    1244            1 :         if len(b.data) == 0 {
    1245            0 :                 b.init(batchrepr.HeaderLen)
    1246            0 :         }
    1247            1 :         batchrepr.SetCount(b.data, b.Count())
    1248            1 :         return b.data
    1249              : }
    1250              : 
    1251              : // SetRepr sets the underlying batch representation. The batch takes ownership
    1252              : // of the supplied slice. It is not safe to modify it afterwards until the
    1253              : // Batch is no longer in use.
    1254              : //
    1255              : // SetRepr may return ErrInvalidBatch if the supplied slice fails to decode in
    1256              : // any way. It will not return an error in any other circumstance.
    1257            1 : func (b *Batch) SetRepr(data []byte) error {
    1258            1 :         h, ok := batchrepr.ReadHeader(data)
    1259            1 :         if !ok {
    1260            0 :                 return ErrInvalidBatch
    1261            0 :         }
    1262            1 :         b.data = data
    1263            1 :         b.count = uint64(h.Count)
    1264            1 :         var err error
    1265            1 :         if b.db != nil {
    1266            1 :                 // Only track memTableSize for batches that will be committed to the DB.
    1267            1 :                 err = b.refreshMemTableSize()
    1268            1 :         }
    1269            1 :         return err
    1270              : }
    1271              : 
    1272              : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
    1273              : // return false). The iterator can be positioned via a call to SeekGE,
    1274              : // SeekPrefixGE, SeekLT, First or Last. Only indexed batches support iterators.
    1275              : //
    1276              : // The returned Iterator observes all of the Batch's existing mutations, but no
    1277              : // later mutations. Its view can be refreshed via RefreshBatchSnapshot or
    1278              : // SetOptions().
    1279            1 : func (b *Batch) NewIter(o *IterOptions) (*Iterator, error) {
    1280            1 :         return b.NewIterWithContext(context.Background(), o)
    1281            1 : }
    1282              : 
    1283              : // NewIterWithContext is like NewIter, and additionally accepts a context for
    1284              : // tracing.
    1285            1 : func (b *Batch) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
    1286            1 :         if b.index == nil {
    1287            0 :                 return nil, ErrNotIndexed
    1288            0 :         }
    1289            1 :         return b.db.newIter(ctx, b, newIterOpts{}, o), nil
    1290              : }
    1291              : 
    1292              : // NewBatchOnlyIter constructs an iterator that only reads the contents of the
    1293              : // batch, and does not overlay the batch mutations on top of the DB state.
    1294              : //
    1295              : // The returned Iterator observes all of the Batch's existing mutations, but
    1296              : // no later mutations. Its view can be refreshed via RefreshBatchSnapshot or
    1297              : // SetOptions().
    1298            0 : func (b *Batch) NewBatchOnlyIter(ctx context.Context, o *IterOptions) (*Iterator, error) {
    1299            0 :         if b.index == nil {
    1300            0 :                 return nil, ErrNotIndexed
    1301            0 :         }
    1302            0 :         return b.db.newIter(ctx, b, newIterOpts{batch: batchIterOpts{batchOnly: true}}, o), nil
    1303              : }
    1304              : 
    1305              : // newInternalIter creates a new internalIterator that iterates over the
    1306              : // contents of the batch.
    1307            1 : func (b *Batch) newInternalIter(o *IterOptions) *batchIter {
    1308            1 :         iter := &batchIter{}
    1309            1 :         b.initInternalIter(o, iter)
    1310            1 :         return iter
    1311            1 : }
    1312              : 
    1313            1 : func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) {
    1314            1 :         *iter = batchIter{
    1315            1 :                 batch: b,
    1316            1 :                 iter:  b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()),
    1317            1 :                 // NB: We explicitly do not propagate the batch snapshot to the point
    1318            1 :                 // key iterator. Filtering point keys within the batch iterator can
    1319            1 :                 // cause pathological behavior where a batch iterator advances
    1320            1 :                 // significantly farther than necessary filtering many batch keys that
    1321            1 :                 // are not visible at the batch sequence number. Instead, the merging
    1322            1 :                 // iterator enforces bounds.
    1323            1 :                 //
    1324            1 :                 // For example, consider an engine that contains the committed keys
    1325            1 :                 // 'bar' and 'bax', with no keys between them. Consider a batch
    1326            1 :                 // containing keys 1,000 keys within the range [a,z]. All of the
    1327            1 :                 // batch keys were added to the batch after the iterator was
    1328            1 :                 // constructed, so they are not visible to the iterator. A call to
    1329            1 :                 // SeekGE('bax') would seek the LSM iterators and discover the key
    1330            1 :                 // 'bax'. It would also seek the batch iterator, landing on the key
    1331            1 :                 // 'baz' but discover it that it's not visible. The batch iterator would
    1332            1 :                 // next through the rest of the batch's keys, only to discover there are
    1333            1 :                 // no visible keys greater than or equal to 'bax'.
    1334            1 :                 //
    1335            1 :                 // Filtering these batch points within the merging iterator ensures that
    1336            1 :                 // the batch iterator never needs to iterate beyond 'baz', because it
    1337            1 :                 // already found a smaller, visible key 'bax'.
    1338            1 :                 snapshot: base.SeqNumMax,
    1339            1 :         }
    1340            1 : }
    1341              : 
    1342            1 : func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
    1343            1 :         // Construct an iterator even if rangeDelIndex is nil, because it is allowed
    1344            1 :         // to refresh later, so we need the container to exist.
    1345            1 :         iter := new(keyspan.Iter)
    1346            1 :         b.initRangeDelIter(o, iter, batchSnapshot)
    1347            1 :         return iter
    1348            1 : }
    1349              : 
    1350            1 : func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
    1351            1 :         if b.rangeDelIndex == nil {
    1352            1 :                 iter.Init(b.comparer.Compare, nil)
    1353            1 :                 return
    1354            1 :         }
    1355              : 
    1356              :         // Fragment the range tombstones the first time a range deletion iterator is
    1357              :         // requested. The cached tombstones are invalidated if another range
    1358              :         // deletion tombstone is added to the batch. This cache is only guaranteed
    1359              :         // to be correct if we're opening an iterator to read at a batch sequence
    1360              :         // number at least as high as tombstonesSeqNum. The cache is guaranteed to
    1361              :         // include all tombstones up to tombstonesSeqNum, and if any additional
    1362              :         // tombstones were added after that sequence number the cache would've been
    1363              :         // cleared.
    1364            1 :         nextSeqNum := b.nextSeqNum()
    1365            1 :         if b.tombstones != nil && b.tombstonesSeqNum <= batchSnapshot {
    1366            0 :                 iter.Init(b.comparer.Compare, b.tombstones)
    1367            0 :                 return
    1368            0 :         }
    1369              : 
    1370            1 :         tombstones := make([]keyspan.Span, 0, b.countRangeDels)
    1371            1 :         frag := &keyspan.Fragmenter{
    1372            1 :                 Cmp:    b.comparer.Compare,
    1373            1 :                 Format: b.comparer.FormatKey,
    1374            1 :                 Emit: func(s keyspan.Span) {
    1375            1 :                         tombstones = append(tombstones, s)
    1376            1 :                 },
    1377              :         }
    1378            1 :         it := &batchIter{
    1379            1 :                 batch:    b,
    1380            1 :                 iter:     b.rangeDelIndex.NewIter(nil, nil),
    1381            1 :                 snapshot: batchSnapshot,
    1382            1 :         }
    1383            1 :         fragmentRangeDels(frag, it, int(b.countRangeDels))
    1384            1 :         iter.Init(b.comparer.Compare, tombstones)
    1385            1 : 
    1386            1 :         // If we just read all the tombstones in the batch (eg, batchSnapshot was
    1387            1 :         // set to b.nextSeqNum()), then cache the tombstones so that a subsequent
    1388            1 :         // call to initRangeDelIter may use them without refragmenting.
    1389            1 :         if nextSeqNum == batchSnapshot {
    1390            0 :                 b.tombstones = tombstones
    1391            0 :                 b.tombstonesSeqNum = nextSeqNum
    1392            0 :         }
    1393              : }
    1394              : 
    1395            1 : func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int) {
    1396            1 :         // The memory management here is a bit subtle. The keys and values returned
    1397            1 :         // by the iterator are slices in Batch.data. Thus the fragmented tombstones
    1398            1 :         // are slices within Batch.data. If additional entries are added to the
    1399            1 :         // Batch, Batch.data may be reallocated. The references in the fragmented
    1400            1 :         // tombstones will remain valid, pointing into the old Batch.data. GC for
    1401            1 :         // the win.
    1402            1 : 
    1403            1 :         // Use a single []keyspan.Key buffer to avoid allocating many
    1404            1 :         // individual []keyspan.Key slices with a single element each.
    1405            1 :         keyBuf := make([]keyspan.Key, 0, count)
    1406            1 :         for kv := it.First(); kv != nil; kv = it.Next() {
    1407            1 :                 s := rangedel.Decode(kv.K, kv.InPlaceValue(), keyBuf)
    1408            1 :                 keyBuf = s.Keys[len(s.Keys):]
    1409            1 : 
    1410            1 :                 // Set a fixed capacity to avoid accidental overwriting.
    1411            1 :                 s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
    1412            1 :                 frag.Add(s)
    1413            1 :         }
    1414            1 :         frag.Finish()
    1415              : }
    1416              : 
    1417            1 : func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
    1418            1 :         // Construct an iterator even if rangeKeyIndex is nil, because it is allowed
    1419            1 :         // to refresh later, so we need the container to exist.
    1420            1 :         iter := new(keyspan.Iter)
    1421            1 :         b.initRangeKeyIter(o, iter, batchSnapshot)
    1422            1 :         return iter
    1423            1 : }
    1424              : 
    1425            1 : func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
    1426            1 :         if b.rangeKeyIndex == nil {
    1427            1 :                 iter.Init(b.comparer.Compare, nil)
    1428            1 :                 return
    1429            1 :         }
    1430              : 
    1431              :         // Fragment the range keys the first time a range key iterator is requested.
    1432              :         // The cached spans are invalidated if another range key is added to the
    1433              :         // batch. This cache is only guaranteed to be correct if we're opening an
    1434              :         // iterator to read at a batch sequence number at least as high as
    1435              :         // rangeKeysSeqNum. The cache is guaranteed to include all range keys up to
    1436              :         // rangeKeysSeqNum, and if any additional range keys were added after that
    1437              :         // sequence number the cache would've been cleared.
    1438            1 :         nextSeqNum := b.nextSeqNum()
    1439            1 :         if b.rangeKeys != nil && b.rangeKeysSeqNum <= batchSnapshot {
    1440            1 :                 iter.Init(b.comparer.Compare, b.rangeKeys)
    1441            1 :                 return
    1442            1 :         }
    1443              : 
    1444            1 :         rangeKeys := make([]keyspan.Span, 0, b.countRangeKeys)
    1445            1 :         frag := &keyspan.Fragmenter{
    1446            1 :                 Cmp:    b.comparer.Compare,
    1447            1 :                 Format: b.comparer.FormatKey,
    1448            1 :                 Emit: func(s keyspan.Span) {
    1449            1 :                         rangeKeys = append(rangeKeys, s)
    1450            1 :                 },
    1451              :         }
    1452            1 :         it := &batchIter{
    1453            1 :                 batch:    b,
    1454            1 :                 iter:     b.rangeKeyIndex.NewIter(nil, nil),
    1455            1 :                 snapshot: batchSnapshot,
    1456            1 :         }
    1457            1 :         _ = fragmentRangeKeys(frag, it, int(b.countRangeKeys))
    1458            1 :         iter.Init(b.comparer.Compare, rangeKeys)
    1459            1 : 
    1460            1 :         // If we just read all the range keys in the batch (eg, batchSnapshot was
    1461            1 :         // set to b.nextSeqNum()), then cache the range keys so that a subsequent
    1462            1 :         // call to initRangeKeyIter may use them without refragmenting.
    1463            1 :         if nextSeqNum == batchSnapshot {
    1464            1 :                 b.rangeKeys = rangeKeys
    1465            1 :                 b.rangeKeysSeqNum = nextSeqNum
    1466            1 :         }
    1467              : }
    1468              : 
    1469            1 : func fragmentRangeKeys(frag *keyspan.Fragmenter, it internalIterator, count int) error {
    1470            1 :         // The memory management here is a bit subtle. The keys and values
    1471            1 :         // returned by the iterator are slices in Batch.data. Thus the
    1472            1 :         // fragmented key spans are slices within Batch.data. If additional
    1473            1 :         // entries are added to the Batch, Batch.data may be reallocated. The
    1474            1 :         // references in the fragmented keys will remain valid, pointing into
    1475            1 :         // the old Batch.data. GC for the win.
    1476            1 : 
    1477            1 :         // Use a single []keyspan.Key buffer to avoid allocating many
    1478            1 :         // individual []keyspan.Key slices with a single element each.
    1479            1 :         keyBuf := make([]keyspan.Key, 0, count)
    1480            1 :         for kv := it.First(); kv != nil; kv = it.Next() {
    1481            1 :                 s, err := rangekey.Decode(kv.K, kv.InPlaceValue(), keyBuf)
    1482            1 :                 if err != nil {
    1483            0 :                         return err
    1484            0 :                 }
    1485            1 :                 keyBuf = s.Keys[len(s.Keys):]
    1486            1 : 
    1487            1 :                 // Set a fixed capacity to avoid accidental overwriting.
    1488            1 :                 s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
    1489            1 :                 frag.Add(s)
    1490              :         }
    1491            1 :         frag.Finish()
    1492            1 :         return nil
    1493              : }
    1494              : 
    1495              : // Commit applies the batch to its parent writer.
    1496            1 : func (b *Batch) Commit(o *WriteOptions) error {
    1497            1 :         return b.db.Apply(b, o)
    1498            1 : }
    1499              : 
    1500              : // Close closes the batch without committing it.
    1501            1 : func (b *Batch) Close() error {
    1502            1 :         // The storage engine commit pipeline may retain a pointer to b.data beyond
    1503            1 :         // when Commit() returns. This is possible when configured for WAL failover;
    1504            1 :         // we don't know if we might need to read the batch data again until the
    1505            1 :         // batch has been durably synced [even if the committer doesn't care to wait
    1506            1 :         // for the sync and Sync()=false].
    1507            1 :         //
    1508            1 :         // We still want to recycle these batches. The b.lifecycle atomic negotiates
    1509            1 :         // the batch's lifecycle. If the commit pipeline still might read b.data,
    1510            1 :         // b.lifecycle will be nonzeroed [the low bits hold a ref count].
    1511            1 :         for {
    1512            1 :                 v := b.lifecycle.Load()
    1513            1 :                 switch {
    1514            1 :                 case v == 0:
    1515            1 :                         // A zero value indicates that the commit pipeline has no
    1516            1 :                         // outstanding references to the batch. The commit pipeline is
    1517            1 :                         // required to acquire a ref synchronously, so there is no risk that
    1518            1 :                         // the commit pipeline will grab a ref after the call to release. We
    1519            1 :                         // can simply release the batch.
    1520            1 :                         b.release()
    1521            1 :                         return nil
    1522            0 :                 case (v & batchClosedBit) != 0:
    1523            0 :                         // The batch has a batchClosedBit: This batch has already been closed.
    1524            0 :                         return ErrClosed
    1525            1 :                 default:
    1526            1 :                         // There's an outstanding reference. Set the batch released bit so
    1527            1 :                         // that the commit pipeline knows it should release the batch when
    1528            1 :                         // it unrefs.
    1529            1 :                         if b.lifecycle.CompareAndSwap(v, v|batchClosedBit) {
    1530            1 :                                 return nil
    1531            1 :                         }
    1532              :                         // CAS Failed—this indicates the outstanding reference just
    1533              :                         // decremented (or the caller illegally closed the batch twice).
    1534              :                         // Loop to reload.
    1535              :                 }
    1536              :         }
    1537              : }
    1538              : 
    1539              : // Indexed returns true if the batch is indexed (i.e. supports read
    1540              : // operations).
    1541            1 : func (b *Batch) Indexed() bool {
    1542            1 :         return b.index != nil
    1543            1 : }
    1544              : 
    1545              : // init ensures that the batch data slice is initialized to meet the
    1546              : // minimum required size and allocates space for the batch header.
    1547            1 : func (b *Batch) init(size int) {
    1548            1 :         b.opts.ensureDefaults()
    1549            1 :         n := b.opts.initialSizeBytes
    1550            1 :         for n < size {
    1551            0 :                 n *= 2
    1552            0 :         }
    1553            1 :         if cap(b.data) < n {
    1554            1 :                 b.data = rawalloc.New(batchrepr.HeaderLen, n)
    1555            1 :         }
    1556            1 :         b.data = b.data[:batchrepr.HeaderLen]
    1557            1 :         clear(b.data) // Zero the sequence number in the header
    1558              : }
    1559              : 
    1560              : // Reset resets the batch for reuse. The underlying byte slice (that is
    1561              : // returned by Repr()) may not be modified. It is only necessary to call this
    1562              : // method if a batch is explicitly being reused. Close automatically takes are
    1563              : // of releasing resources when appropriate for batches that are internally
    1564              : // being reused.
    1565            0 : func (b *Batch) Reset() {
    1566            0 :         // In some configurations (WAL failover) the commit pipeline may retain
    1567            0 :         // b.data beyond a call to commit the batch. When this happens, b.lifecycle
    1568            0 :         // is nonzero (see the comment above b.lifecycle). In this case it's unsafe
    1569            0 :         // to mutate b.data, so we discard it. Note that Reset must not be called on
    1570            0 :         // a closed batch, so v > 0 implies a non-zero ref count and not
    1571            0 :         // batchClosedBit being set.
    1572            0 :         if v := b.lifecycle.Load(); v > 0 {
    1573            0 :                 b.data = nil
    1574            0 :         }
    1575            0 :         b.reset()
    1576              : }
    1577              : 
    1578            1 : func (b *Batch) reset() {
    1579            1 :         // Zero out the struct, retaining only the fields necessary for manual
    1580            1 :         // reuse.
    1581            1 :         b.batchInternal = batchInternal{
    1582            1 :                 data:     b.data,
    1583            1 :                 comparer: b.comparer,
    1584            1 :                 opts:     b.opts,
    1585            1 :                 index:    b.index,
    1586            1 :                 db:       b.db,
    1587            1 :         }
    1588            1 :         b.applied.Store(false)
    1589            1 :         if b.data != nil {
    1590            1 :                 if cap(b.data) > b.opts.maxRetainedSizeBytes {
    1591            0 :                         // If the capacity of the buffer is larger than our maximum
    1592            0 :                         // retention size, don't re-use it. Let it be GC-ed instead.
    1593            0 :                         // This prevents the memory from an unusually large batch from
    1594            0 :                         // being held on to indefinitely.
    1595            0 :                         b.data = nil
    1596            1 :                 } else {
    1597            1 :                         // Otherwise, reset the buffer for re-use.
    1598            1 :                         b.data = b.data[:batchrepr.HeaderLen]
    1599            1 :                         clear(b.data)
    1600            1 :                 }
    1601              :         }
    1602            1 :         if b.index != nil {
    1603            1 :                 b.index.Init(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
    1604            1 :         }
    1605              : }
    1606              : 
    1607            1 : func (b *Batch) grow(n int) {
    1608            1 :         newSize := len(b.data) + n
    1609            1 :         if uint64(newSize) >= maxBatchSize {
    1610            0 :                 panic(ErrBatchTooLarge)
    1611              :         }
    1612            1 :         if newSize > cap(b.data) {
    1613            0 :                 newCap := 2 * cap(b.data)
    1614            0 :                 for newCap < newSize {
    1615            0 :                         newCap *= 2
    1616            0 :                 }
    1617            0 :                 newData := rawalloc.New(len(b.data), newCap)
    1618            0 :                 copy(newData, b.data)
    1619            0 :                 b.data = newData
    1620              :         }
    1621            1 :         b.data = b.data[:newSize]
    1622              : }
    1623              : 
    1624            1 : func (b *Batch) setSeqNum(seqNum base.SeqNum) {
    1625            1 :         batchrepr.SetSeqNum(b.data, seqNum)
    1626            1 : }
    1627              : 
    1628              : // SeqNum returns the batch sequence number which is applied to the first
    1629              : // record in the batch. The sequence number is incremented for each subsequent
    1630              : // record. It returns zero if the batch is empty.
    1631            1 : func (b *Batch) SeqNum() base.SeqNum {
    1632            1 :         if len(b.data) == 0 {
    1633            0 :                 b.init(batchrepr.HeaderLen)
    1634            0 :         }
    1635            1 :         return batchrepr.ReadSeqNum(b.data)
    1636              : }
    1637              : 
    1638            1 : func (b *Batch) setCount(v uint32) {
    1639            1 :         b.count = uint64(v)
    1640            1 : }
    1641              : 
    1642              : // Count returns the count of memtable-modifying operations in this batch. All
    1643              : // operations with the except of LogData increment this count. For IngestSSTs,
    1644              : // count is only used to indicate the number of SSTs ingested in the record, the
    1645              : // batch isn't applied to the memtable.
    1646            1 : func (b *Batch) Count() uint32 {
    1647            1 :         if b.count > math.MaxUint32 {
    1648            0 :                 panic(batchrepr.ErrInvalidBatch)
    1649              :         }
    1650            1 :         return uint32(b.count)
    1651              : }
    1652              : 
    1653              : // Reader returns a batchrepr.Reader for the current batch contents. If the
    1654              : // batch is mutated, the new entries will not be visible to the reader.
    1655            1 : func (b *Batch) Reader() batchrepr.Reader {
    1656            1 :         if len(b.data) == 0 {
    1657            1 :                 b.init(batchrepr.HeaderLen)
    1658            1 :         }
    1659            1 :         return batchrepr.Read(b.data)
    1660              : }
    1661              : 
    1662              : // SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
    1663            1 : func (b *Batch) SyncWait() error {
    1664            1 :         now := crtime.NowMono()
    1665            1 :         b.fsyncWait.Wait()
    1666            1 :         if b.commitErr != nil {
    1667            0 :                 b.db = nil // prevent batch reuse on error
    1668            0 :         }
    1669            1 :         waitDuration := now.Elapsed()
    1670            1 :         b.commitStats.CommitWaitDuration += waitDuration
    1671            1 :         b.commitStats.TotalDuration += waitDuration
    1672            1 :         return b.commitErr
    1673              : }
    1674              : 
    1675              : // CommitStats returns stats related to committing the batch. Should be called
    1676              : // after Batch.Commit, DB.Apply. If DB.ApplyNoSyncWait is used, should be
    1677              : // called after Batch.SyncWait.
    1678            0 : func (b *Batch) CommitStats() BatchCommitStats {
    1679            0 :         return b.commitStats
    1680            0 : }
    1681              : 
    1682              : // Note: batchIter mirrors the implementation of flushableBatchIter. Keep the
    1683              : // two in sync.
    1684              : type batchIter struct {
    1685              :         batch *Batch
    1686              :         iter  batchskl.Iterator
    1687              :         kv    base.InternalKV
    1688              :         err   error
    1689              :         // snapshot holds a batch "sequence number" at which the batch is being
    1690              :         // read. This sequence number has the InternalKeySeqNumBatch bit set, so it
    1691              :         // encodes an offset within the batch. Only batch entries earlier than the
    1692              :         // offset are visible during iteration.
    1693              :         snapshot base.SeqNum
    1694              : }
    1695              : 
    1696              : // batchIter implements the base.InternalIterator interface.
    1697              : var _ base.InternalIterator = (*batchIter)(nil)
    1698              : 
    1699            0 : func (i *batchIter) String() string {
    1700            0 :         return "batch"
    1701            0 : }
    1702              : 
    1703            1 : func (i *batchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
    1704            1 :         // Ignore TrySeekUsingNext if the view of the batch changed.
    1705            1 :         if flags.TrySeekUsingNext() && flags.BatchJustRefreshed() {
    1706            0 :                 flags = flags.DisableTrySeekUsingNext()
    1707            0 :         }
    1708              : 
    1709            1 :         i.err = nil // clear cached iteration error
    1710            1 :         ikey := i.iter.SeekGE(key, flags)
    1711            1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1712            0 :                 ikey = i.iter.Next()
    1713            0 :         }
    1714            1 :         if ikey == nil {
    1715            1 :                 i.kv = base.InternalKV{}
    1716            1 :                 return nil
    1717            1 :         }
    1718            1 :         i.kv.K = *ikey
    1719            1 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1720            1 :         return &i.kv
    1721              : }
    1722              : 
    1723            1 : func (i *batchIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
    1724            1 :         kv := i.SeekGE(key, flags)
    1725            1 :         if kv == nil {
    1726            1 :                 return nil
    1727            1 :         }
    1728              :         // If the key doesn't have the sought prefix, return nil.
    1729            1 :         if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
    1730            1 :                 i.kv = base.InternalKV{}
    1731            1 :                 return nil
    1732            1 :         }
    1733            1 :         return kv
    1734              : }
    1735              : 
    1736            1 : func (i *batchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
    1737            1 :         i.err = nil // clear cached iteration error
    1738            1 :         ikey := i.iter.SeekLT(key)
    1739            1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1740            0 :                 ikey = i.iter.Prev()
    1741            0 :         }
    1742            1 :         if ikey == nil {
    1743            1 :                 i.kv = base.InternalKV{}
    1744            1 :                 return nil
    1745            1 :         }
    1746            1 :         i.kv.K = *ikey
    1747            1 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1748            1 :         return &i.kv
    1749              : }
    1750              : 
    1751            1 : func (i *batchIter) First() *base.InternalKV {
    1752            1 :         i.err = nil // clear cached iteration error
    1753            1 :         ikey := i.iter.First()
    1754            1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1755            1 :                 ikey = i.iter.Next()
    1756            1 :         }
    1757            1 :         if ikey == nil {
    1758            1 :                 i.kv = base.InternalKV{}
    1759            1 :                 return nil
    1760            1 :         }
    1761            1 :         i.kv.K = *ikey
    1762            1 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1763            1 :         return &i.kv
    1764              : }
    1765              : 
    1766            1 : func (i *batchIter) Last() *base.InternalKV {
    1767            1 :         i.err = nil // clear cached iteration error
    1768            1 :         ikey := i.iter.Last()
    1769            1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1770            0 :                 ikey = i.iter.Prev()
    1771            0 :         }
    1772            1 :         if ikey == nil {
    1773            1 :                 i.kv = base.InternalKV{}
    1774            1 :                 return nil
    1775            1 :         }
    1776            1 :         i.kv.K = *ikey
    1777            1 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1778            1 :         return &i.kv
    1779              : }
    1780              : 
    1781            1 : func (i *batchIter) Next() *base.InternalKV {
    1782            1 :         ikey := i.iter.Next()
    1783            1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1784            0 :                 ikey = i.iter.Next()
    1785            0 :         }
    1786            1 :         if ikey == nil {
    1787            1 :                 i.kv = base.InternalKV{}
    1788            1 :                 return nil
    1789            1 :         }
    1790            1 :         i.kv.K = *ikey
    1791            1 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1792            1 :         return &i.kv
    1793              : }
    1794              : 
    1795            0 : func (i *batchIter) NextPrefix(succKey []byte) *base.InternalKV {
    1796            0 :         // Because NextPrefix was invoked `succKey` must be ≥ the key at i's current
    1797            0 :         // position. Seek the arena iterator using TrySeekUsingNext.
    1798            0 :         ikey := i.iter.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
    1799            0 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1800            0 :                 ikey = i.iter.Next()
    1801            0 :         }
    1802            0 :         if ikey == nil {
    1803            0 :                 i.kv = base.InternalKV{}
    1804            0 :                 return nil
    1805            0 :         }
    1806            0 :         i.kv.K = *ikey
    1807            0 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1808            0 :         return &i.kv
    1809              : }
    1810              : 
    1811            1 : func (i *batchIter) Prev() *base.InternalKV {
    1812            1 :         ikey := i.iter.Prev()
    1813            1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1814            0 :                 ikey = i.iter.Prev()
    1815            0 :         }
    1816            1 :         if ikey == nil {
    1817            1 :                 i.kv = base.InternalKV{}
    1818            1 :                 return nil
    1819            1 :         }
    1820            0 :         i.kv.K = *ikey
    1821            0 :         i.kv.V = base.MakeInPlaceValue(i.value())
    1822            0 :         return &i.kv
    1823              : }
    1824              : 
    1825            1 : func (i *batchIter) value() []byte {
    1826            1 :         offset, _, keyEnd := i.iter.KeyInfo()
    1827            1 :         data := i.batch.data
    1828            1 :         if len(data[offset:]) == 0 {
    1829            0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    1830            0 :                 return nil
    1831            0 :         }
    1832              : 
    1833            1 :         switch InternalKeyKind(data[offset]) {
    1834              :         case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
    1835              :                 InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
    1836            1 :                 InternalKeyKindDeleteSized:
    1837            1 :                 _, value, ok := batchrepr.DecodeStr(data[keyEnd:])
    1838            1 :                 if !ok {
    1839            0 :                         return nil
    1840            0 :                 }
    1841            1 :                 return value
    1842            1 :         default:
    1843            1 :                 return nil
    1844              :         }
    1845              : }
    1846              : 
    1847            1 : func (i *batchIter) Error() error {
    1848            1 :         return i.err
    1849            1 : }
    1850              : 
    1851            1 : func (i *batchIter) Close() error {
    1852            1 :         _ = i.iter.Close()
    1853            1 :         return i.err
    1854            1 : }
    1855              : 
    1856            1 : func (i *batchIter) SetBounds(lower, upper []byte) {
    1857            1 :         i.iter.SetBounds(lower, upper)
    1858            1 : }
    1859              : 
    1860            0 : func (i *batchIter) SetContext(_ context.Context) {}
    1861              : 
    1862              : // DebugTree is part of the InternalIterator interface.
    1863            0 : func (i *batchIter) DebugTree(tp treeprinter.Node) {
    1864            0 :         tp.Childf("%T(%p)", i, i)
    1865            0 : }
    1866              : 
    1867              : type flushableBatchEntry struct {
    1868              :         // offset is the byte offset of the record within the batch repr.
    1869              :         offset uint32
    1870              :         // index is the 0-based ordinal number of the record within the batch. Used
    1871              :         // to compute the seqnum for the record.
    1872              :         index uint32
    1873              :         // key{Start,End} are the start and end byte offsets of the key within the
    1874              :         // batch repr. Cached to avoid decoding the key length on every
    1875              :         // comparison. The value is stored starting at keyEnd.
    1876              :         keyStart uint32
    1877              :         keyEnd   uint32
    1878              : }
    1879              : 
    1880              : // flushableBatch wraps an existing batch and provides the interfaces needed
    1881              : // for making the batch flushable (i.e. able to mimic a memtable).
    1882              : type flushableBatch struct {
    1883              :         cmp      Compare
    1884              :         comparer *base.Comparer
    1885              :         data     []byte
    1886              : 
    1887              :         // The base sequence number for the entries in the batch. This is the same
    1888              :         // value as Batch.seqNum() and is cached here for performance.
    1889              :         seqNum base.SeqNum
    1890              : 
    1891              :         // A slice of offsets and indices for the entries in the batch. Used to
    1892              :         // implement flushableBatchIter. Unlike the indexing on a normal batch, a
    1893              :         // flushable batch is indexed such that batch entry i will be given the
    1894              :         // sequence number flushableBatch.seqNum+i.
    1895              :         //
    1896              :         // Sorted in increasing order of key and decreasing order of offset (since
    1897              :         // higher offsets correspond to higher sequence numbers).
    1898              :         //
    1899              :         // Does not include range deletion entries or range key entries.
    1900              :         offsets []flushableBatchEntry
    1901              : 
    1902              :         // Fragmented range deletion tombstones.
    1903              :         tombstones []keyspan.Span
    1904              : 
    1905              :         // Fragmented range keys.
    1906              :         rangeKeys []keyspan.Span
    1907              : }
    1908              : 
    1909              : var _ flushable = (*flushableBatch)(nil)
    1910              : 
    1911              : // newFlushableBatch creates a new batch that implements the flushable
    1912              : // interface. This allows the batch to act like a memtable and be placed in the
    1913              : // queue of flushable memtables. Note that the flushable batch takes ownership
    1914              : // of the batch data.
    1915            1 : func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error) {
    1916            1 :         b := &flushableBatch{
    1917            1 :                 data:     batch.data,
    1918            1 :                 cmp:      comparer.Compare,
    1919            1 :                 comparer: comparer,
    1920            1 :                 offsets:  make([]flushableBatchEntry, 0, batch.Count()),
    1921            1 :         }
    1922            1 :         if b.data != nil {
    1923            1 :                 // Note that this sequence number is not correct when this batch has not
    1924            1 :                 // been applied since the sequence number has not been assigned yet. The
    1925            1 :                 // correct sequence number will be set later. But it is correct when the
    1926            1 :                 // batch is being replayed from the WAL.
    1927            1 :                 b.seqNum = batch.SeqNum()
    1928            1 :         }
    1929            1 :         var rangeDelOffsets []flushableBatchEntry
    1930            1 :         var rangeKeyOffsets []flushableBatchEntry
    1931            1 :         if len(b.data) > batchrepr.HeaderLen {
    1932            1 :                 // Non-empty batch.
    1933            1 :                 var index uint32
    1934            1 :                 for iter := batchrepr.Read(b.data); len(iter) > 0; {
    1935            1 :                         offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
    1936            1 :                         kind, key, _, ok, err := iter.Next()
    1937            1 :                         if !ok {
    1938            0 :                                 if err != nil {
    1939            0 :                                         return nil, err
    1940            0 :                                 }
    1941            0 :                                 break
    1942              :                         }
    1943            1 :                         entry := flushableBatchEntry{
    1944            1 :                                 offset: uint32(offset),
    1945            1 :                                 index:  uint32(index),
    1946            1 :                         }
    1947            1 :                         if keySize := uint32(len(key)); keySize == 0 {
    1948            1 :                                 // Must add 2 to the offset. One byte encodes `kind` and the next
    1949            1 :                                 // byte encodes `0`, which is the length of the key.
    1950            1 :                                 entry.keyStart = uint32(offset) + 2
    1951            1 :                                 entry.keyEnd = entry.keyStart
    1952            1 :                         } else {
    1953            1 :                                 entry.keyStart = uint32(uintptr(unsafe.Pointer(&key[0])) -
    1954            1 :                                         uintptr(unsafe.Pointer(&b.data[0])))
    1955            1 :                                 entry.keyEnd = entry.keyStart + keySize
    1956            1 :                         }
    1957            1 :                         switch kind {
    1958            1 :                         case InternalKeyKindRangeDelete:
    1959            1 :                                 rangeDelOffsets = append(rangeDelOffsets, entry)
    1960            1 :                         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
    1961            1 :                                 rangeKeyOffsets = append(rangeKeyOffsets, entry)
    1962            1 :                         case InternalKeyKindLogData:
    1963            1 :                                 // Skip it; we never want to iterate over LogDatas.
    1964            1 :                                 continue
    1965              :                         case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge,
    1966            1 :                                 InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
    1967            1 :                                 b.offsets = append(b.offsets, entry)
    1968            0 :                         default:
    1969            0 :                                 // Note In some circumstances this might be temporary memory
    1970            0 :                                 // corruption that can be recovered by discarding the batch and
    1971            0 :                                 // trying again. In other cases, the batch repr might've been
    1972            0 :                                 // already persisted elsewhere, and we'll loop continuously trying
    1973            0 :                                 // to commit the same corrupted batch. The caller is responsible for
    1974            0 :                                 // distinguishing.
    1975            0 :                                 return nil, errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
    1976              :                         }
    1977              :                         // NB: index (used for entry.offset above) must not reach the
    1978              :                         // batch.count, because the offset is used in conjunction with the
    1979              :                         // batch's sequence number to assign sequence numbers to keys within
    1980              :                         // the batch. If we assign KV's indexes as high as batch.count,
    1981              :                         // we'll begin assigning keys sequence numbers that weren't
    1982              :                         // allocated.
    1983            1 :                         if index >= uint32(batch.count) {
    1984            0 :                                 return nil, base.AssertionFailedf("pebble: batch entry index %d ≥ batch.count %d", index, batch.count)
    1985            0 :                         }
    1986            1 :                         index++
    1987              :                 }
    1988              :         }
    1989              : 
    1990              :         // Sort all of offsets, rangeDelOffsets and rangeKeyOffsets, using *batch's
    1991              :         // sort.Interface implementation.
    1992            1 :         pointOffsets := b.offsets
    1993            1 :         sort.Sort(b)
    1994            1 :         b.offsets = rangeDelOffsets
    1995            1 :         sort.Sort(b)
    1996            1 :         b.offsets = rangeKeyOffsets
    1997            1 :         sort.Sort(b)
    1998            1 :         b.offsets = pointOffsets
    1999            1 : 
    2000            1 :         if len(rangeDelOffsets) > 0 {
    2001            1 :                 frag := &keyspan.Fragmenter{
    2002            1 :                         Cmp:    b.cmp,
    2003            1 :                         Format: b.comparer.FormatKey,
    2004            1 :                         Emit: func(s keyspan.Span) {
    2005            1 :                                 b.tombstones = append(b.tombstones, s)
    2006            1 :                         },
    2007              :                 }
    2008            1 :                 it := &flushableBatchIter{
    2009            1 :                         batch:   b,
    2010            1 :                         data:    b.data,
    2011            1 :                         offsets: rangeDelOffsets,
    2012            1 :                         cmp:     b.cmp,
    2013            1 :                         index:   -1,
    2014            1 :                 }
    2015            1 :                 fragmentRangeDels(frag, it, len(rangeDelOffsets))
    2016              :         }
    2017            1 :         if len(rangeKeyOffsets) > 0 {
    2018            1 :                 frag := &keyspan.Fragmenter{
    2019            1 :                         Cmp:    b.cmp,
    2020            1 :                         Format: b.comparer.FormatKey,
    2021            1 :                         Emit: func(s keyspan.Span) {
    2022            1 :                                 b.rangeKeys = append(b.rangeKeys, s)
    2023            1 :                         },
    2024              :                 }
    2025            1 :                 it := &flushableBatchIter{
    2026            1 :                         batch:   b,
    2027            1 :                         data:    b.data,
    2028            1 :                         offsets: rangeKeyOffsets,
    2029            1 :                         cmp:     b.cmp,
    2030            1 :                         index:   -1,
    2031            1 :                 }
    2032            1 :                 if err := fragmentRangeKeys(frag, it, len(rangeKeyOffsets)); err != nil {
    2033            0 :                         return nil, err
    2034            0 :                 }
    2035              :         }
    2036            1 :         return b, nil
    2037              : }
    2038              : 
    2039            1 : func (b *flushableBatch) setSeqNum(seqNum base.SeqNum) {
    2040            1 :         if b.seqNum != 0 {
    2041            0 :                 panic(fmt.Sprintf("pebble: flushableBatch.seqNum already set: %d", b.seqNum))
    2042              :         }
    2043            1 :         b.seqNum = seqNum
    2044            1 :         for i := range b.tombstones {
    2045            1 :                 for j := range b.tombstones[i].Keys {
    2046            1 :                         b.tombstones[i].Keys[j].Trailer = base.MakeTrailer(
    2047            1 :                                 b.tombstones[i].Keys[j].SeqNum()+seqNum,
    2048            1 :                                 b.tombstones[i].Keys[j].Kind(),
    2049            1 :                         )
    2050            1 :                 }
    2051              :         }
    2052            1 :         for i := range b.rangeKeys {
    2053            1 :                 for j := range b.rangeKeys[i].Keys {
    2054            1 :                         b.rangeKeys[i].Keys[j].Trailer = base.MakeTrailer(
    2055            1 :                                 b.rangeKeys[i].Keys[j].SeqNum()+seqNum,
    2056            1 :                                 b.rangeKeys[i].Keys[j].Kind(),
    2057            1 :                         )
    2058            1 :                 }
    2059              :         }
    2060              : }
    2061              : 
    2062            1 : func (b *flushableBatch) Len() int {
    2063            1 :         return len(b.offsets)
    2064            1 : }
    2065              : 
    2066            1 : func (b *flushableBatch) Less(i, j int) bool {
    2067            1 :         ei := &b.offsets[i]
    2068            1 :         ej := &b.offsets[j]
    2069            1 :         ki := b.data[ei.keyStart:ei.keyEnd]
    2070            1 :         kj := b.data[ej.keyStart:ej.keyEnd]
    2071            1 :         switch c := b.cmp(ki, kj); {
    2072            1 :         case c < 0:
    2073            1 :                 return true
    2074            1 :         case c > 0:
    2075            1 :                 return false
    2076            1 :         default:
    2077            1 :                 return ei.offset > ej.offset
    2078              :         }
    2079              : }
    2080              : 
    2081            1 : func (b *flushableBatch) Swap(i, j int) {
    2082            1 :         b.offsets[i], b.offsets[j] = b.offsets[j], b.offsets[i]
    2083            1 : }
    2084              : 
    2085              : // newIter is part of the flushable interface.
    2086            1 : func (b *flushableBatch) newIter(o *IterOptions) internalIterator {
    2087            1 :         return &flushableBatchIter{
    2088            1 :                 batch:   b,
    2089            1 :                 data:    b.data,
    2090            1 :                 offsets: b.offsets,
    2091            1 :                 cmp:     b.cmp,
    2092            1 :                 index:   -1,
    2093            1 :                 lower:   o.GetLowerBound(),
    2094            1 :                 upper:   o.GetUpperBound(),
    2095            1 :         }
    2096            1 : }
    2097              : 
    2098              : // newFlushIter is part of the flushable interface.
    2099            1 : func (b *flushableBatch) newFlushIter(o *IterOptions) internalIterator {
    2100            1 :         return &flushFlushableBatchIter{
    2101            1 :                 flushableBatchIter: flushableBatchIter{
    2102            1 :                         batch:   b,
    2103            1 :                         data:    b.data,
    2104            1 :                         offsets: b.offsets,
    2105            1 :                         cmp:     b.cmp,
    2106            1 :                         index:   -1,
    2107            1 :                 },
    2108            1 :         }
    2109            1 : }
    2110              : 
    2111              : // newRangeDelIter is part of the flushable interface.
    2112            1 : func (b *flushableBatch) newRangeDelIter(o *IterOptions) keyspan.FragmentIterator {
    2113            1 :         if len(b.tombstones) == 0 {
    2114            1 :                 return nil
    2115            1 :         }
    2116            1 :         return keyspan.NewIter(b.cmp, b.tombstones)
    2117              : }
    2118              : 
    2119              : // newRangeKeyIter is part of the flushable interface.
    2120            1 : func (b *flushableBatch) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
    2121            1 :         if len(b.rangeKeys) == 0 {
    2122            1 :                 return nil
    2123            1 :         }
    2124            1 :         return keyspan.NewIter(b.cmp, b.rangeKeys)
    2125              : }
    2126              : 
    2127              : // containsRangeKeys is part of the flushable interface.
    2128            1 : func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0 }
    2129              : 
    2130              : // inuseBytes is part of the flushable interface.
    2131            1 : func (b *flushableBatch) inuseBytes() uint64 {
    2132            1 :         return uint64(len(b.data) - batchrepr.HeaderLen)
    2133            1 : }
    2134              : 
    2135              : // totalBytes is part of the flushable interface.
    2136            1 : func (b *flushableBatch) totalBytes() uint64 {
    2137            1 :         return uint64(cap(b.data))
    2138            1 : }
    2139              : 
    2140              : // readyForFlush is part of the flushable interface.
    2141            1 : func (b *flushableBatch) readyForFlush() bool {
    2142            1 :         // A flushable batch is always ready for flush; it must be flushed together
    2143            1 :         // with the previous memtable.
    2144            1 :         return true
    2145            1 : }
    2146              : 
    2147              : // computePossibleOverlaps is part of the flushable interface.
    2148              : func (b *flushableBatch) computePossibleOverlaps(
    2149              :         fn func(bounded) shouldContinue, bounded ...bounded,
    2150            1 : ) {
    2151            1 :         computePossibleOverlapsGenericImpl[*flushableBatch](b, b.cmp, fn, bounded)
    2152            1 : }
    2153              : 
    2154              : // Note: flushableBatchIter mirrors the implementation of batchIter. Keep the
    2155              : // two in sync.
    2156              : type flushableBatchIter struct {
    2157              :         // Members to be initialized by creator.
    2158              :         batch *flushableBatch
    2159              :         // The bytes backing the batch. Always the same as batch.data?
    2160              :         data []byte
    2161              :         // The sorted entries. This is not always equal to batch.offsets.
    2162              :         offsets []flushableBatchEntry
    2163              :         cmp     Compare
    2164              :         // Must be initialized to -1. It is the index into offsets that represents
    2165              :         // the current iterator position.
    2166              :         index int
    2167              : 
    2168              :         // For internal use by the implementation.
    2169              :         kv  base.InternalKV
    2170              :         err error
    2171              : 
    2172              :         // Optionally initialize to bounds of iteration, if any.
    2173              :         lower []byte
    2174              :         upper []byte
    2175              : }
    2176              : 
    2177              : // flushableBatchIter implements the base.InternalIterator interface.
    2178              : var _ base.InternalIterator = (*flushableBatchIter)(nil)
    2179              : 
    2180            1 : func (i *flushableBatchIter) String() string {
    2181            1 :         return "flushable-batch"
    2182            1 : }
    2183              : 
    2184              : // SeekGE implements internalIterator.SeekGE, as documented in the pebble
    2185              : // package. Ignore flags.TrySeekUsingNext() since we don't expect this
    2186              : // optimization to provide much benefit here at the moment.
    2187            1 : func (i *flushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
    2188            1 :         i.err = nil // clear cached iteration error
    2189            1 :         ikey := base.MakeSearchKey(key)
    2190            1 :         i.index = sort.Search(len(i.offsets), func(j int) bool {
    2191            1 :                 return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
    2192            1 :         })
    2193            1 :         if i.index >= len(i.offsets) {
    2194            1 :                 return nil
    2195            1 :         }
    2196            1 :         kv := i.getKV(i.index)
    2197            1 :         if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
    2198            1 :                 i.index = len(i.offsets)
    2199            1 :                 return nil
    2200            1 :         }
    2201            1 :         return kv
    2202              : }
    2203              : 
    2204              : // SeekPrefixGE implements internalIterator.SeekPrefixGE, as documented in the
    2205              : // pebble package.
    2206              : func (i *flushableBatchIter) SeekPrefixGE(
    2207              :         prefix, key []byte, flags base.SeekGEFlags,
    2208            1 : ) *base.InternalKV {
    2209            1 :         kv := i.SeekGE(key, flags)
    2210            1 :         if kv == nil {
    2211            1 :                 return nil
    2212            1 :         }
    2213              :         // If the key doesn't have the sought prefix, return nil.
    2214            1 :         if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
    2215            1 :                 return nil
    2216            1 :         }
    2217            1 :         return kv
    2218              : }
    2219              : 
    2220              : // SeekLT implements internalIterator.SeekLT, as documented in the pebble
    2221              : // package.
    2222            1 : func (i *flushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
    2223            1 :         i.err = nil // clear cached iteration error
    2224            1 :         ikey := base.MakeSearchKey(key)
    2225            1 :         i.index = sort.Search(len(i.offsets), func(j int) bool {
    2226            1 :                 return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
    2227            1 :         })
    2228            1 :         i.index--
    2229            1 :         if i.index < 0 {
    2230            1 :                 return nil
    2231            1 :         }
    2232            1 :         kv := i.getKV(i.index)
    2233            1 :         if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
    2234            1 :                 i.index = -1
    2235            1 :                 return nil
    2236            1 :         }
    2237            1 :         return kv
    2238              : }
    2239              : 
    2240              : // First implements internalIterator.First, as documented in the pebble
    2241              : // package.
    2242            1 : func (i *flushableBatchIter) First() *base.InternalKV {
    2243            1 :         i.err = nil // clear cached iteration error
    2244            1 :         if len(i.offsets) == 0 {
    2245            1 :                 return nil
    2246            1 :         }
    2247            1 :         i.index = 0
    2248            1 :         kv := i.getKV(i.index)
    2249            1 :         if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
    2250            1 :                 i.index = len(i.offsets)
    2251            1 :                 return nil
    2252            1 :         }
    2253            1 :         return kv
    2254              : }
    2255              : 
    2256              : // Last implements internalIterator.Last, as documented in the pebble
    2257              : // package.
    2258            1 : func (i *flushableBatchIter) Last() *base.InternalKV {
    2259            1 :         i.err = nil // clear cached iteration error
    2260            1 :         if len(i.offsets) == 0 {
    2261            1 :                 return nil
    2262            1 :         }
    2263            1 :         i.index = len(i.offsets) - 1
    2264            1 :         kv := i.getKV(i.index)
    2265            1 :         if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
    2266            0 :                 i.index = -1
    2267            0 :                 return nil
    2268            0 :         }
    2269            1 :         return kv
    2270              : }
    2271              : 
    2272              : // Note: flushFlushableBatchIter.Next mirrors the implementation of
    2273              : // flushableBatchIter.Next due to performance. Keep the two in sync.
    2274            1 : func (i *flushableBatchIter) Next() *base.InternalKV {
    2275            1 :         if i.index == len(i.offsets) {
    2276            0 :                 return nil
    2277            0 :         }
    2278            1 :         i.index++
    2279            1 :         if i.index == len(i.offsets) {
    2280            1 :                 return nil
    2281            1 :         }
    2282            1 :         kv := i.getKV(i.index)
    2283            1 :         if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
    2284            1 :                 i.index = len(i.offsets)
    2285            1 :                 return nil
    2286            1 :         }
    2287            1 :         return kv
    2288              : }
    2289              : 
    2290            1 : func (i *flushableBatchIter) Prev() *base.InternalKV {
    2291            1 :         if i.index < 0 {
    2292            0 :                 return nil
    2293            0 :         }
    2294            1 :         i.index--
    2295            1 :         if i.index < 0 {
    2296            1 :                 return nil
    2297            1 :         }
    2298            1 :         kv := i.getKV(i.index)
    2299            1 :         if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
    2300            1 :                 i.index = -1
    2301            1 :                 return nil
    2302            1 :         }
    2303            1 :         return kv
    2304              : }
    2305              : 
    2306              : // Note: flushFlushableBatchIter.NextPrefix mirrors the implementation of
    2307              : // flushableBatchIter.NextPrefix due to performance. Keep the two in sync.
    2308            0 : func (i *flushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV {
    2309            0 :         return i.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
    2310            0 : }
    2311              : 
    2312            1 : func (i *flushableBatchIter) getKey(index int) InternalKey {
    2313            1 :         e := &i.offsets[index]
    2314            1 :         kind := InternalKeyKind(i.data[e.offset])
    2315            1 :         key := i.data[e.keyStart:e.keyEnd]
    2316            1 :         return base.MakeInternalKey(key, i.batch.seqNum+base.SeqNum(e.index), kind)
    2317            1 : }
    2318              : 
    2319            1 : func (i *flushableBatchIter) getKV(index int) *base.InternalKV {
    2320            1 :         i.kv = base.InternalKV{
    2321            1 :                 K: i.getKey(index),
    2322            1 :                 V: base.MakeInPlaceValue(i.extractValue()),
    2323            1 :         }
    2324            1 :         return &i.kv
    2325            1 : }
    2326              : 
    2327            1 : func (i *flushableBatchIter) extractValue() []byte {
    2328            1 :         p := i.data[i.offsets[i.index].offset:]
    2329            1 :         if len(p) == 0 {
    2330            0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    2331            0 :                 return nil
    2332            0 :         }
    2333            1 :         kind := InternalKeyKind(p[0])
    2334            1 :         if kind > InternalKeyKindMax {
    2335            0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    2336            0 :                 return nil
    2337            0 :         }
    2338            1 :         var value []byte
    2339            1 :         var ok bool
    2340            1 :         switch kind {
    2341              :         case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
    2342              :                 InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
    2343            1 :                 InternalKeyKindDeleteSized:
    2344            1 :                 keyEnd := i.offsets[i.index].keyEnd
    2345            1 :                 _, value, ok = batchrepr.DecodeStr(i.data[keyEnd:])
    2346            1 :                 if !ok {
    2347            0 :                         i.err = base.CorruptionErrorf("corrupted batch")
    2348            0 :                         return nil
    2349            0 :                 }
    2350              :         }
    2351            1 :         return value
    2352              : }
    2353              : 
    2354            0 : func (i *flushableBatchIter) Valid() bool {
    2355            0 :         return i.index >= 0 && i.index < len(i.offsets)
    2356            0 : }
    2357              : 
    2358            1 : func (i *flushableBatchIter) Error() error {
    2359            1 :         return i.err
    2360            1 : }
    2361              : 
    2362            1 : func (i *flushableBatchIter) Close() error {
    2363            1 :         return i.err
    2364            1 : }
    2365              : 
    2366            1 : func (i *flushableBatchIter) SetBounds(lower, upper []byte) {
    2367            1 :         i.lower = lower
    2368            1 :         i.upper = upper
    2369            1 : }
    2370              : 
    2371            0 : func (i *flushableBatchIter) SetContext(_ context.Context) {}
    2372              : 
    2373              : // DebugTree is part of the InternalIterator interface.
    2374            0 : func (i *flushableBatchIter) DebugTree(tp treeprinter.Node) {
    2375            0 :         tp.Childf("%T(%p)", i, i)
    2376            0 : }
    2377              : 
    2378              : // flushFlushableBatchIter is similar to flushableBatchIter but it keeps track
    2379              : // of number of bytes iterated.
    2380              : type flushFlushableBatchIter struct {
    2381              :         flushableBatchIter
    2382              : }
    2383              : 
    2384              : // flushFlushableBatchIter implements the base.InternalIterator interface.
    2385              : var _ base.InternalIterator = (*flushFlushableBatchIter)(nil)
    2386              : 
    2387            0 : func (i *flushFlushableBatchIter) String() string {
    2388            0 :         return "flushable-batch"
    2389            0 : }
    2390              : 
    2391            0 : func (i *flushFlushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
    2392            0 :         panic("pebble: SeekGE unimplemented")
    2393              : }
    2394              : 
    2395              : func (i *flushFlushableBatchIter) SeekPrefixGE(
    2396              :         prefix, key []byte, flags base.SeekGEFlags,
    2397            0 : ) *base.InternalKV {
    2398            0 :         panic("pebble: SeekPrefixGE unimplemented")
    2399              : }
    2400              : 
    2401            0 : func (i *flushFlushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
    2402            0 :         panic("pebble: SeekLT unimplemented")
    2403              : }
    2404              : 
    2405            1 : func (i *flushFlushableBatchIter) First() *base.InternalKV {
    2406            1 :         i.err = nil // clear cached iteration error
    2407            1 :         return i.flushableBatchIter.First()
    2408            1 : }
    2409              : 
    2410            0 : func (i *flushFlushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV {
    2411            0 :         panic("pebble: Prev unimplemented")
    2412              : }
    2413              : 
    2414              : // Note: flushFlushableBatchIter.Next mirrors the implementation of
    2415              : // flushableBatchIter.Next due to performance. Keep the two in sync.
    2416            1 : func (i *flushFlushableBatchIter) Next() *base.InternalKV {
    2417            1 :         if i.index == len(i.offsets) {
    2418            0 :                 return nil
    2419            0 :         }
    2420            1 :         i.index++
    2421            1 :         if i.index == len(i.offsets) {
    2422            1 :                 return nil
    2423            1 :         }
    2424            1 :         return i.getKV(i.index)
    2425              : }
    2426              : 
    2427            0 : func (i flushFlushableBatchIter) Prev() *base.InternalKV {
    2428            0 :         panic("pebble: Prev unimplemented")
    2429              : }
    2430              : 
    2431              : // batchOptions holds the parameters to configure batch.
    2432              : type batchOptions struct {
    2433              :         initialSizeBytes     int
    2434              :         maxRetainedSizeBytes int
    2435              : }
    2436              : 
    2437              : // ensureDefaults creates batch options with default values.
    2438            1 : func (o *batchOptions) ensureDefaults() {
    2439            1 :         if o.initialSizeBytes <= 0 {
    2440            1 :                 o.initialSizeBytes = defaultBatchInitialSize
    2441            1 :         }
    2442            1 :         if o.maxRetainedSizeBytes <= 0 {
    2443            1 :                 o.maxRetainedSizeBytes = defaultBatchMaxRetainedSize
    2444            1 :         }
    2445              : }
    2446              : 
    2447              : // BatchOption allows customizing the batch.
    2448              : type BatchOption func(*batchOptions)
    2449              : 
    2450              : // WithInitialSizeBytes sets a custom initial size for the batch. Defaults
    2451              : // to 1KB.
    2452            0 : func WithInitialSizeBytes(s int) BatchOption {
    2453            0 :         return func(opts *batchOptions) {
    2454            0 :                 opts.initialSizeBytes = s
    2455            0 :         }
    2456              : }
    2457              : 
    2458              : // WithMaxRetainedSizeBytes sets a custom max size for the batch to be
    2459              : // re-used. Any batch which exceeds the max retained size would be GC-ed.
    2460              : // Defaults to 1MB.
    2461            0 : func WithMaxRetainedSizeBytes(s int) BatchOption {
    2462            0 :         return func(opts *batchOptions) {
    2463            0 :                 opts.maxRetainedSizeBytes = s
    2464            0 :         }
    2465              : }
    2466              : 
    2467              : // batchSort returns iterators for the sorted contents of the batch. It is
    2468              : // intended for testing use only. The batch.Sort dance is done to prevent
    2469              : // exposing this method in the public pebble interface.
    2470              : func batchSort(
    2471              :         i interface{},
    2472              : ) (
    2473              :         points internalIterator,
    2474              :         rangeDels keyspan.FragmentIterator,
    2475              :         rangeKeys keyspan.FragmentIterator,
    2476            1 : ) {
    2477            1 :         b := i.(*Batch)
    2478            1 :         if b.Indexed() {
    2479            1 :                 pointIter := b.newInternalIter(nil)
    2480            1 :                 rangeDelIter := b.newRangeDelIter(nil, math.MaxUint64)
    2481            1 :                 rangeKeyIter := b.newRangeKeyIter(nil, math.MaxUint64)
    2482            1 :                 return pointIter, rangeDelIter, rangeKeyIter
    2483            1 :         }
    2484            1 :         f, err := newFlushableBatch(b, b.db.opts.Comparer)
    2485            1 :         if err != nil {
    2486            0 :                 panic(err)
    2487              :         }
    2488            1 :         return f.newIter(nil), f.newRangeDelIter(nil), f.newRangeKeyIter(nil)
    2489              : }
    2490              : 
    2491            1 : func init() {
    2492            1 :         private.BatchSort = batchSort
    2493            1 : }
        

Generated by: LCOV version 2.0-1