LCOV - code coverage report
Current view: top level - pebble - batch.go (source / functions) Hit Total Coverage
Test: 2024-01-10 08:16Z d50db878 - meta test only.lcov Lines: 1046 1326 78.9 %
Date: 2024-01-10 08:16:55 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "encoding/binary"
      10             :         "fmt"
      11             :         "io"
      12             :         "math"
      13             :         "sort"
      14             :         "sync"
      15             :         "sync/atomic"
      16             :         "time"
      17             :         "unsafe"
      18             : 
      19             :         "github.com/cockroachdb/errors"
      20             :         "github.com/cockroachdb/pebble/internal/base"
      21             :         "github.com/cockroachdb/pebble/internal/batchskl"
      22             :         "github.com/cockroachdb/pebble/internal/humanize"
      23             :         "github.com/cockroachdb/pebble/internal/invariants"
      24             :         "github.com/cockroachdb/pebble/internal/keyspan"
      25             :         "github.com/cockroachdb/pebble/internal/private"
      26             :         "github.com/cockroachdb/pebble/internal/rangedel"
      27             :         "github.com/cockroachdb/pebble/internal/rangekey"
      28             :         "github.com/cockroachdb/pebble/internal/rawalloc"
      29             : )
      30             : 
      31             : const (
      32             :         batchCountOffset     = 8
      33             :         batchHeaderLen       = 12
      34             :         batchInitialSize     = 1 << 10 // 1 KB
      35             :         batchMaxRetainedSize = 1 << 20 // 1 MB
      36             :         invalidBatchCount    = 1<<32 - 1
      37             :         maxVarintLen32       = 5
      38             : )
      39             : 
      40             : // ErrNotIndexed means that a read operation on a batch failed because the
      41             : // batch is not indexed and thus doesn't support reads.
      42             : var ErrNotIndexed = errors.New("pebble: batch not indexed")
      43             : 
      44             : // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
      45             : var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch"))
      46             : 
      47             : // ErrBatchTooLarge indicates that a batch is invalid or otherwise corrupted.
      48             : var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize)))
      49             : 
      50             : // DeferredBatchOp represents a batch operation (eg. set, merge, delete) that is
      51             : // being inserted into the batch. Indexing is not performed on the specified key
      52             : // until Finish is called, hence the name deferred. This struct lets the caller
      53             : // copy or encode keys/values directly into the batch representation instead of
      54             : // copying into an intermediary buffer then having pebble.Batch copy off of it.
      55             : type DeferredBatchOp struct {
      56             :         index *batchskl.Skiplist
      57             : 
      58             :         // Key and Value point to parts of the binary batch representation where
      59             :         // keys and values should be encoded/copied into. len(Key) and len(Value)
      60             :         // bytes must be copied into these slices respectively before calling
      61             :         // Finish(). Changing where these slices point to is not allowed.
      62             :         Key, Value []byte
      63             :         offset     uint32
      64             : }
      65             : 
      66             : // Finish completes the addition of this batch operation, and adds it to the
      67             : // index if necessary. Must be called once (and exactly once) keys/values
      68             : // have been filled into Key and Value. Not calling Finish or not
      69             : // copying/encoding keys will result in an incomplete index, and calling Finish
      70             : // twice may result in a panic.
      71           0 : func (d DeferredBatchOp) Finish() error {
      72           0 :         if d.index != nil {
      73           0 :                 if err := d.index.Add(d.offset); err != nil {
      74           0 :                         return err
      75           0 :                 }
      76             :         }
      77           0 :         return nil
      78             : }
      79             : 
      80             : // A Batch is a sequence of Sets, Merges, Deletes, DeleteRanges, RangeKeySets,
      81             : // RangeKeyUnsets, and/or RangeKeyDeletes that are applied atomically. Batch
      82             : // implements the Reader interface, but only an indexed batch supports reading
      83             : // (without error) via Get or NewIter. A non-indexed batch will return
      84             : // ErrNotIndexed when read from. A batch is not safe for concurrent use, and
      85             : // consumers should use a batch per goroutine or provide their own
      86             : // synchronization.
      87             : //
      88             : // # Indexing
      89             : //
      90             : // Batches can be optionally indexed (see DB.NewIndexedBatch). An indexed batch
      91             : // allows iteration via an Iterator (see Batch.NewIter). The iterator provides
      92             : // a merged view of the operations in the batch and the underlying
      93             : // database. This is implemented by treating the batch as an additional layer
      94             : // in the LSM where every entry in the batch is considered newer than any entry
      95             : // in the underlying database (batch entries have the InternalKeySeqNumBatch
      96             : // bit set). By treating the batch as an additional layer in the LSM, iteration
      97             : // supports all batch operations (i.e. Set, Merge, Delete, DeleteRange,
      98             : // RangeKeySet, RangeKeyUnset, RangeKeyDelete) with minimal effort.
      99             : //
     100             : // The same key can be operated on multiple times in a batch, though only the
     101             : // latest operation will be visible. For example, Put("a", "b"), Delete("a")
     102             : // will cause the key "a" to not be visible in the batch. Put("a", "b"),
     103             : // Put("a", "c") will cause a read of "a" to return the value "c".
     104             : //
     105             : // The batch index is implemented via an skiplist (internal/batchskl). While
     106             : // the skiplist implementation is very fast, inserting into an indexed batch is
     107             : // significantly slower than inserting into a non-indexed batch. Only use an
     108             : // indexed batch if you require reading from it.
     109             : //
     110             : // # Atomic commit
     111             : //
     112             : // The operations in a batch are persisted by calling Batch.Commit which is
     113             : // equivalent to calling DB.Apply(batch). A batch is committed atomically by
     114             : // writing the internal batch representation to the WAL, adding all of the
     115             : // batch operations to the memtable associated with the WAL, and then
     116             : // incrementing the visible sequence number so that subsequent reads can see
     117             : // the effects of the batch operations. If WriteOptions.Sync is true, a call to
     118             : // Batch.Commit will guarantee that the batch is persisted to disk before
     119             : // returning. See commitPipeline for more on the implementation details.
     120             : //
     121             : // # Large batches
     122             : //
     123             : // The size of a batch is limited only by available memory (be aware that
     124             : // indexed batches require considerably additional memory for the skiplist
     125             : // structure). A given WAL file has a single memtable associated with it (this
     126             : // restriction could be removed, but doing so is onerous and complex). And a
     127             : // memtable has a fixed size due to the underlying fixed size arena. Note that
     128             : // this differs from RocksDB where a memtable can grow arbitrarily large using
     129             : // a list of arena chunks. In RocksDB this is accomplished by storing pointers
     130             : // in the arena memory, but that isn't possible in Go.
     131             : //
     132             : // During Batch.Commit, a batch which is larger than a threshold (>
     133             : // MemTableSize/2) is wrapped in a flushableBatch and inserted into the queue
     134             : // of memtables. A flushableBatch forces WAL to be rotated, but that happens
     135             : // anyways when the memtable becomes full so this does not cause significant
     136             : // WAL churn. Because the flushableBatch is readable as another layer in the
     137             : // LSM, Batch.Commit returns as soon as the flushableBatch has been added to
     138             : // the queue of memtables.
     139             : //
     140             : // Internally, a flushableBatch provides Iterator support by sorting the batch
     141             : // contents (the batch is sorted once, when it is added to the memtable
     142             : // queue). Sorting the batch contents and insertion of the contents into a
     143             : // memtable have the same big-O time, but the constant factor dominates
     144             : // here. Sorting is significantly faster and uses significantly less memory.
     145             : //
     146             : // # Internal representation
     147             : //
     148             : // The internal batch representation is a contiguous byte buffer with a fixed
     149             : // 12-byte header, followed by a series of records.
     150             : //
     151             : //      +-------------+------------+--- ... ---+
     152             : //      | SeqNum (8B) | Count (4B) |  Entries  |
     153             : //      +-------------+------------+--- ... ---+
     154             : //
     155             : // Each record has a 1-byte kind tag prefix, followed by 1 or 2 length prefixed
     156             : // strings (varstring):
     157             : //
     158             : //      +-----------+-----------------+-------------------+
     159             : //      | Kind (1B) | Key (varstring) | Value (varstring) |
     160             : //      +-----------+-----------------+-------------------+
     161             : //
     162             : // A varstring is a varint32 followed by N bytes of data. The Kind tags are
     163             : // exactly those specified by InternalKeyKind. The following table shows the
     164             : // format for records of each kind:
     165             : //
     166             : //      InternalKeyKindDelete         varstring
     167             : //      InternalKeyKindLogData        varstring
     168             : //      InternalKeyKindIngestSST      varstring
     169             : //      InternalKeyKindSet            varstring varstring
     170             : //      InternalKeyKindMerge          varstring varstring
     171             : //      InternalKeyKindRangeDelete    varstring varstring
     172             : //      InternalKeyKindRangeKeySet    varstring varstring
     173             : //      InternalKeyKindRangeKeyUnset  varstring varstring
     174             : //      InternalKeyKindRangeKeyDelete varstring varstring
     175             : //
     176             : // The intuitive understanding here are that the arguments to Delete, Set,
     177             : // Merge, DeleteRange and RangeKeyDelete are encoded into the batch. The
     178             : // RangeKeySet and RangeKeyUnset operations are slightly more complicated,
     179             : // encoding their end key, suffix and value [in the case of RangeKeySet] within
     180             : // the Value varstring. For more information on the value encoding for
     181             : // RangeKeySet and RangeKeyUnset, see the internal/rangekey package.
     182             : //
     183             : // The internal batch representation is the on disk format for a batch in the
     184             : // WAL, and thus stable. New record kinds may be added, but the existing ones
     185             : // will not be modified.
     186             : type Batch struct {
     187             :         batchInternal
     188             :         applied atomic.Bool
     189             : }
     190             : 
     191             : // batchInternal contains the set of fields within Batch that are non-atomic and
     192             : // capable of being reset using a *b = batchInternal{} struct copy.
     193             : type batchInternal struct {
     194             :         // Data is the wire format of a batch's log entry:
     195             :         //   - 8 bytes for a sequence number of the first batch element,
     196             :         //     or zeroes if the batch has not yet been applied,
     197             :         //   - 4 bytes for the count: the number of elements in the batch,
     198             :         //     or "\xff\xff\xff\xff" if the batch is invalid,
     199             :         //   - count elements, being:
     200             :         //     - one byte for the kind
     201             :         //     - the varint-string user key,
     202             :         //     - the varint-string value (if kind != delete).
     203             :         // The sequence number and count are stored in little-endian order.
     204             :         //
     205             :         // The data field can be (but is not guaranteed to be) nil for new
     206             :         // batches. Large batches will set the data field to nil when committed as
     207             :         // the data has been moved to a flushableBatch and inserted into the queue of
     208             :         // memtables.
     209             :         data           []byte
     210             :         cmp            Compare
     211             :         formatKey      base.FormatKey
     212             :         abbreviatedKey AbbreviatedKey
     213             : 
     214             :         // An upper bound on required space to add this batch to a memtable.
     215             :         // Note that although batches are limited to 4 GiB in size, that limit
     216             :         // applies to len(data), not the memtable size. The upper bound on the
     217             :         // size of a memtable node is larger than the overhead of the batch's log
     218             :         // encoding, so memTableSize is larger than len(data) and may overflow a
     219             :         // uint32.
     220             :         memTableSize uint64
     221             : 
     222             :         // The db to which the batch will be committed. Do not change this field
     223             :         // after the batch has been created as it might invalidate internal state.
     224             :         // Batch.memTableSize is only refreshed if Batch.db is set. Setting db to
     225             :         // nil once it has been set implies that the Batch has encountered an error.
     226             :         db *DB
     227             : 
     228             :         // The count of records in the batch. This count will be stored in the batch
     229             :         // data whenever Repr() is called.
     230             :         count uint64
     231             : 
     232             :         // The count of range deletions in the batch. Updated every time a range
     233             :         // deletion is added.
     234             :         countRangeDels uint64
     235             : 
     236             :         // The count of range key sets, unsets and deletes in the batch. Updated
     237             :         // every time a RANGEKEYSET, RANGEKEYUNSET or RANGEKEYDEL key is added.
     238             :         countRangeKeys uint64
     239             : 
     240             :         // A deferredOp struct, stored in the Batch so that a pointer can be returned
     241             :         // from the *Deferred() methods rather than a value.
     242             :         deferredOp DeferredBatchOp
     243             : 
     244             :         // An optional skiplist keyed by offset into data of the entry.
     245             :         index         *batchskl.Skiplist
     246             :         rangeDelIndex *batchskl.Skiplist
     247             :         rangeKeyIndex *batchskl.Skiplist
     248             : 
     249             :         // Fragmented range deletion tombstones. Cached the first time a range
     250             :         // deletion iterator is requested. The cache is invalidated whenever a new
     251             :         // range deletion is added to the batch. This cache can only be used when
     252             :         // opening an iterator to read at a batch sequence number >=
     253             :         // tombstonesSeqNum. This is the case for all new iterators created over a
     254             :         // batch but it's not the case for all cloned iterators.
     255             :         tombstones       []keyspan.Span
     256             :         tombstonesSeqNum uint64
     257             : 
     258             :         // Fragmented range key spans. Cached the first time a range key iterator is
     259             :         // requested. The cache is invalidated whenever a new range key
     260             :         // (RangeKey{Set,Unset,Del}) is added to the batch. This cache can only be
     261             :         // used when opening an iterator to read at a batch sequence number >=
     262             :         // tombstonesSeqNum. This is the case for all new iterators created over a
     263             :         // batch but it's not the case for all cloned iterators.
     264             :         rangeKeys       []keyspan.Span
     265             :         rangeKeysSeqNum uint64
     266             : 
     267             :         // The flushableBatch wrapper if the batch is too large to fit in the
     268             :         // memtable.
     269             :         flushable *flushableBatch
     270             : 
     271             :         // minimumFormatMajorVersion indicates the format major version required in
     272             :         // order to commit this batch. If an operation requires a particular format
     273             :         // major version, it ratchets the batch's minimumFormatMajorVersion. When
     274             :         // the batch is committed, this is validated against the database's current
     275             :         // format major version.
     276             :         minimumFormatMajorVersion FormatMajorVersion
     277             : 
     278             :         // Synchronous Apply uses the commit WaitGroup for both publishing the
     279             :         // seqnum and waiting for the WAL fsync (if needed). Asynchronous
     280             :         // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit
     281             :         // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for
     282             :         // waiting for the WAL fsync.
     283             :         //
     284             :         // TODO(sumeer): if we find that ApplyNoSyncWait in conjunction with
     285             :         // SyncWait is causing higher memory usage because of the time duration
     286             :         // between when the sync is already done, and a goroutine calls SyncWait
     287             :         // (followed by Batch.Close), we could separate out {fsyncWait, commitErr}
     288             :         // into a separate struct that is allocated separately (using another
     289             :         // sync.Pool), and only that struct needs to outlive Batch.Close (which
     290             :         // could then be called immediately after ApplyNoSyncWait). commitStats
     291             :         // will also need to be in this separate struct.
     292             :         commit    sync.WaitGroup
     293             :         fsyncWait sync.WaitGroup
     294             : 
     295             :         commitStats BatchCommitStats
     296             : 
     297             :         commitErr error
     298             : 
     299             :         // Position bools together to reduce the sizeof the struct.
     300             : 
     301             :         // ingestedSSTBatch indicates that the batch contains one or more key kinds
     302             :         // of InternalKeyKindIngestSST. If the batch contains key kinds of IngestSST
     303             :         // then it will only contain key kinds of IngestSST.
     304             :         ingestedSSTBatch bool
     305             : 
     306             :         // committing is set to true when a batch begins to commit. It's used to
     307             :         // ensure the batch is not mutated concurrently. It is not an atomic
     308             :         // deliberately, so as to avoid the overhead on batch mutations. This is
     309             :         // okay, because under correct usage this field will never be accessed
     310             :         // concurrently. It's only under incorrect usage the memory accesses of this
     311             :         // variable may violate memory safety. Since we don't use atomics here,
     312             :         // false negatives are possible.
     313             :         committing bool
     314             : }
     315             : 
     316             : // BatchCommitStats exposes stats related to committing a batch.
     317             : //
     318             : // NB: there is no Pebble internal tracing (using LoggerAndTracer) of slow
     319             : // batch commits. The caller can use these stats to do their own tracing as
     320             : // needed.
     321             : type BatchCommitStats struct {
     322             :         // TotalDuration is the time spent in DB.{Apply,ApplyNoSyncWait} or
     323             :         // Batch.Commit, plus the time waiting in Batch.SyncWait. If there is a gap
     324             :         // between calling ApplyNoSyncWait and calling SyncWait, that gap could
     325             :         // include some duration in which real work was being done for the commit
     326             :         // and will not be included here. This missing time is considered acceptable
     327             :         // since the goal of these stats is to understand user-facing latency.
     328             :         //
     329             :         // TotalDuration includes time spent in various queues both inside Pebble
     330             :         // and outside Pebble (I/O queues, goroutine scheduler queue, mutex wait
     331             :         // etc.). For some of these queues (which we consider important) the wait
     332             :         // times are included below -- these expose low-level implementation detail
     333             :         // and are meant for expert diagnosis and subject to change. There may be
     334             :         // unaccounted time after subtracting those values from TotalDuration.
     335             :         TotalDuration time.Duration
     336             :         // SemaphoreWaitDuration is the wait time for semaphores in
     337             :         // commitPipeline.Commit.
     338             :         SemaphoreWaitDuration time.Duration
     339             :         // WALQueueWaitDuration is the wait time for allocating memory blocks in the
     340             :         // LogWriter (due to the LogWriter not writing fast enough). At the moment
     341             :         // this is duration is always zero because a single WAL will allow
     342             :         // allocating memory blocks up to the entire memtable size. In the future,
     343             :         // we may pipeline WALs and bound the WAL queued blocks separately, so this
     344             :         // field is preserved for that possibility.
     345             :         WALQueueWaitDuration time.Duration
     346             :         // MemTableWriteStallDuration is the wait caused by a write stall due to too
     347             :         // many memtables (due to not flushing fast enough).
     348             :         MemTableWriteStallDuration time.Duration
     349             :         // L0ReadAmpWriteStallDuration is the wait caused by a write stall due to
     350             :         // high read amplification in L0 (due to not compacting fast enough out of
     351             :         // L0).
     352             :         L0ReadAmpWriteStallDuration time.Duration
     353             :         // WALRotationDuration is the wait time for WAL rotation, which includes
     354             :         // syncing and closing the old WAL and creating (or reusing) a new one.
     355             :         WALRotationDuration time.Duration
     356             :         // CommitWaitDuration is the wait for publishing the seqnum plus the
     357             :         // duration for the WAL sync (if requested). The former should be tiny and
     358             :         // one can assume that this is all due to the WAL sync.
     359             :         CommitWaitDuration time.Duration
     360             : }
     361             : 
     362             : var _ Reader = (*Batch)(nil)
     363             : var _ Writer = (*Batch)(nil)
     364             : 
     365             : var batchPool = sync.Pool{
     366           1 :         New: func() interface{} {
     367           1 :                 return &Batch{}
     368           1 :         },
     369             : }
     370             : 
     371             : type indexedBatch struct {
     372             :         batch Batch
     373             :         index batchskl.Skiplist
     374             : }
     375             : 
     376             : var indexedBatchPool = sync.Pool{
     377           1 :         New: func() interface{} {
     378           1 :                 return &indexedBatch{}
     379           1 :         },
     380             : }
     381             : 
     382           1 : func newBatch(db *DB) *Batch {
     383           1 :         b := batchPool.Get().(*Batch)
     384           1 :         b.db = db
     385           1 :         return b
     386           1 : }
     387             : 
     388           0 : func newBatchWithSize(db *DB, size int) *Batch {
     389           0 :         b := newBatch(db)
     390           0 :         if cap(b.data) < size {
     391           0 :                 b.data = rawalloc.New(0, size)
     392           0 :         }
     393           0 :         return b
     394             : }
     395             : 
     396           1 : func newIndexedBatch(db *DB, comparer *Comparer) *Batch {
     397           1 :         i := indexedBatchPool.Get().(*indexedBatch)
     398           1 :         i.batch.cmp = comparer.Compare
     399           1 :         i.batch.formatKey = comparer.FormatKey
     400           1 :         i.batch.abbreviatedKey = comparer.AbbreviatedKey
     401           1 :         i.batch.db = db
     402           1 :         i.batch.index = &i.index
     403           1 :         i.batch.index.Init(&i.batch.data, i.batch.cmp, i.batch.abbreviatedKey)
     404           1 :         return &i.batch
     405           1 : }
     406             : 
     407           0 : func newIndexedBatchWithSize(db *DB, comparer *Comparer, size int) *Batch {
     408           0 :         b := newIndexedBatch(db, comparer)
     409           0 :         if cap(b.data) < size {
     410           0 :                 b.data = rawalloc.New(0, size)
     411           0 :         }
     412           0 :         return b
     413             : }
     414             : 
     415             : // nextSeqNum returns the batch "sequence number" that will be given to the next
     416             : // key written to the batch. During iteration keys within an indexed batch are
     417             : // given a sequence number consisting of their offset within the batch combined
     418             : // with the base.InternalKeySeqNumBatch bit. These sequence numbers are only
     419             : // used during iteration, and the keys are assigned ordinary sequence numbers
     420             : // when the batch is committed.
     421           1 : func (b *Batch) nextSeqNum() uint64 {
     422           1 :         return uint64(len(b.data)) | base.InternalKeySeqNumBatch
     423           1 : }
     424             : 
     425           1 : func (b *Batch) release() {
     426           1 :         if b.db == nil {
     427           1 :                 // The batch was not created using newBatch or newIndexedBatch, or an error
     428           1 :                 // was encountered. We don't try to reuse batches that encountered an error
     429           1 :                 // because they might be stuck somewhere in the system and attempting to
     430           1 :                 // reuse such batches is a recipe for onerous debugging sessions. Instead,
     431           1 :                 // let the GC do its job.
     432           1 :                 return
     433           1 :         }
     434           1 :         b.db = nil
     435           1 : 
     436           1 :         // NB: This is ugly (it would be cleaner if we could just assign a Batch{}),
     437           1 :         // but necessary so that we can use atomic.StoreUint32 for the Batch.applied
     438           1 :         // field. Without using an atomic to clear that field the Go race detector
     439           1 :         // complains.
     440           1 :         b.Reset()
     441           1 :         b.cmp = nil
     442           1 :         b.formatKey = nil
     443           1 :         b.abbreviatedKey = nil
     444           1 : 
     445           1 :         if b.index == nil {
     446           1 :                 batchPool.Put(b)
     447           1 :         } else {
     448           1 :                 b.index, b.rangeDelIndex, b.rangeKeyIndex = nil, nil, nil
     449           1 :                 indexedBatchPool.Put((*indexedBatch)(unsafe.Pointer(b)))
     450           1 :         }
     451             : }
     452             : 
     453           1 : func (b *Batch) refreshMemTableSize() error {
     454           1 :         b.memTableSize = 0
     455           1 :         if len(b.data) < batchHeaderLen {
     456           0 :                 return nil
     457           0 :         }
     458             : 
     459           1 :         b.countRangeDels = 0
     460           1 :         b.countRangeKeys = 0
     461           1 :         b.minimumFormatMajorVersion = 0
     462           1 :         for r := b.Reader(); ; {
     463           1 :                 kind, key, value, ok, err := r.Next()
     464           1 :                 if !ok {
     465           1 :                         if err != nil {
     466           0 :                                 return err
     467           0 :                         }
     468           1 :                         break
     469             :                 }
     470           1 :                 switch kind {
     471           1 :                 case InternalKeyKindRangeDelete:
     472           1 :                         b.countRangeDels++
     473           1 :                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     474           1 :                         b.countRangeKeys++
     475           1 :                 case InternalKeyKindDeleteSized:
     476           1 :                         if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
     477           1 :                                 b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
     478           1 :                         }
     479           1 :                 case InternalKeyKindIngestSST:
     480           1 :                         if b.minimumFormatMajorVersion < FormatFlushableIngest {
     481           1 :                                 b.minimumFormatMajorVersion = FormatFlushableIngest
     482           1 :                         }
     483             :                         // This key kind doesn't contribute to the memtable size.
     484           1 :                         continue
     485             :                 }
     486           1 :                 b.memTableSize += memTableEntrySize(len(key), len(value))
     487             :         }
     488           1 :         return nil
     489             : }
     490             : 
     491             : // Apply the operations contained in the batch to the receiver batch.
     492             : //
     493             : // It is safe to modify the contents of the arguments after Apply returns.
     494           1 : func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
     495           1 :         if b.ingestedSSTBatch {
     496           0 :                 panic("pebble: invalid batch application")
     497             :         }
     498           1 :         if len(batch.data) == 0 {
     499           1 :                 return nil
     500           1 :         }
     501           1 :         if len(batch.data) < batchHeaderLen {
     502           0 :                 return ErrInvalidBatch
     503           0 :         }
     504             : 
     505           1 :         offset := len(b.data)
     506           1 :         if offset == 0 {
     507           1 :                 b.init(offset)
     508           1 :                 offset = batchHeaderLen
     509           1 :         }
     510           1 :         b.data = append(b.data, batch.data[batchHeaderLen:]...)
     511           1 : 
     512           1 :         b.setCount(b.Count() + batch.Count())
     513           1 : 
     514           1 :         if b.db != nil || b.index != nil {
     515           1 :                 // Only iterate over the new entries if we need to track memTableSize or in
     516           1 :                 // order to update the index.
     517           1 :                 for iter := BatchReader(b.data[offset:]); len(iter) > 0; {
     518           1 :                         offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
     519           1 :                         kind, key, value, ok, err := iter.Next()
     520           1 :                         if !ok {
     521           0 :                                 if err != nil {
     522           0 :                                         return err
     523           0 :                                 }
     524           0 :                                 break
     525             :                         }
     526           1 :                         switch kind {
     527           1 :                         case InternalKeyKindRangeDelete:
     528           1 :                                 b.countRangeDels++
     529           1 :                         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     530           1 :                                 b.countRangeKeys++
     531           0 :                         case InternalKeyKindIngestSST:
     532           0 :                                 panic("pebble: invalid key kind for batch")
     533             :                         }
     534           1 :                         if b.index != nil {
     535           1 :                                 var err error
     536           1 :                                 switch kind {
     537           0 :                                 case InternalKeyKindRangeDelete:
     538           0 :                                         b.tombstones = nil
     539           0 :                                         b.tombstonesSeqNum = 0
     540           0 :                                         if b.rangeDelIndex == nil {
     541           0 :                                                 b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
     542           0 :                                         }
     543           0 :                                         err = b.rangeDelIndex.Add(uint32(offset))
     544           0 :                                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     545           0 :                                         b.rangeKeys = nil
     546           0 :                                         b.rangeKeysSeqNum = 0
     547           0 :                                         if b.rangeKeyIndex == nil {
     548           0 :                                                 b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
     549           0 :                                         }
     550           0 :                                         err = b.rangeKeyIndex.Add(uint32(offset))
     551           1 :                                 default:
     552           1 :                                         err = b.index.Add(uint32(offset))
     553             :                                 }
     554           1 :                                 if err != nil {
     555           0 :                                         return err
     556           0 :                                 }
     557             :                         }
     558           1 :                         b.memTableSize += memTableEntrySize(len(key), len(value))
     559             :                 }
     560             :         }
     561           1 :         return nil
     562             : }
     563             : 
     564             : // Get gets the value for the given key. It returns ErrNotFound if the Batch
     565             : // does not contain the key.
     566             : //
     567             : // The caller should not modify the contents of the returned slice, but it is
     568             : // safe to modify the contents of the argument after Get returns. The returned
     569             : // slice will remain valid until the returned Closer is closed. On success, the
     570             : // caller MUST call closer.Close() or a memory leak will occur.
     571           1 : func (b *Batch) Get(key []byte) ([]byte, io.Closer, error) {
     572           1 :         if b.index == nil {
     573           0 :                 return nil, nil, ErrNotIndexed
     574           0 :         }
     575           1 :         return b.db.getInternal(key, b, nil /* snapshot */)
     576             : }
     577             : 
     578           1 : func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind InternalKeyKind) {
     579           1 :         if b.committing {
     580           0 :                 panic("pebble: batch already committing")
     581             :         }
     582           1 :         if len(b.data) == 0 {
     583           1 :                 b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchHeaderLen)
     584           1 :         }
     585           1 :         b.count++
     586           1 :         b.memTableSize += memTableEntrySize(keyLen, valueLen)
     587           1 : 
     588           1 :         pos := len(b.data)
     589           1 :         b.deferredOp.offset = uint32(pos)
     590           1 :         b.grow(1 + 2*maxVarintLen32 + keyLen + valueLen)
     591           1 :         b.data[pos] = byte(kind)
     592           1 :         pos++
     593           1 : 
     594           1 :         {
     595           1 :                 // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
     596           1 :                 // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
     597           1 :                 // versions show this to not be a performance win.
     598           1 :                 x := uint32(keyLen)
     599           1 :                 for x >= 0x80 {
     600           0 :                         b.data[pos] = byte(x) | 0x80
     601           0 :                         x >>= 7
     602           0 :                         pos++
     603           0 :                 }
     604           1 :                 b.data[pos] = byte(x)
     605           1 :                 pos++
     606             :         }
     607             : 
     608           1 :         b.deferredOp.Key = b.data[pos : pos+keyLen]
     609           1 :         pos += keyLen
     610           1 : 
     611           1 :         {
     612           1 :                 // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
     613           1 :                 // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
     614           1 :                 // versions show this to not be a performance win.
     615           1 :                 x := uint32(valueLen)
     616           1 :                 for x >= 0x80 {
     617           0 :                         b.data[pos] = byte(x) | 0x80
     618           0 :                         x >>= 7
     619           0 :                         pos++
     620           0 :                 }
     621           1 :                 b.data[pos] = byte(x)
     622           1 :                 pos++
     623             :         }
     624             : 
     625           1 :         b.deferredOp.Value = b.data[pos : pos+valueLen]
     626           1 :         // Shrink data since varints may be shorter than the upper bound.
     627           1 :         b.data = b.data[:pos+valueLen]
     628             : }
     629             : 
     630           1 : func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) {
     631           1 :         if b.committing {
     632           0 :                 panic("pebble: batch already committing")
     633             :         }
     634           1 :         if len(b.data) == 0 {
     635           1 :                 b.init(keyLen + binary.MaxVarintLen64 + batchHeaderLen)
     636           1 :         }
     637           1 :         b.count++
     638           1 :         b.memTableSize += memTableEntrySize(keyLen, 0)
     639           1 : 
     640           1 :         pos := len(b.data)
     641           1 :         b.deferredOp.offset = uint32(pos)
     642           1 :         b.grow(1 + maxVarintLen32 + keyLen)
     643           1 :         b.data[pos] = byte(kind)
     644           1 :         pos++
     645           1 : 
     646           1 :         {
     647           1 :                 // TODO(peter): Manually inlined version binary.PutUvarint(). Remove if
     648           1 :                 // go1.13 or future versions show this to not be a performance win. See
     649           1 :                 // BenchmarkBatchSet.
     650           1 :                 x := uint32(keyLen)
     651           1 :                 for x >= 0x80 {
     652           0 :                         b.data[pos] = byte(x) | 0x80
     653           0 :                         x >>= 7
     654           0 :                         pos++
     655           0 :                 }
     656           1 :                 b.data[pos] = byte(x)
     657           1 :                 pos++
     658             :         }
     659             : 
     660           1 :         b.deferredOp.Key = b.data[pos : pos+keyLen]
     661           1 :         b.deferredOp.Value = nil
     662           1 : 
     663           1 :         // Shrink data since varint may be shorter than the upper bound.
     664           1 :         b.data = b.data[:pos+keyLen]
     665             : }
     666             : 
     667             : // AddInternalKey allows the caller to add an internal key of point key or range
     668             : // key kinds (but not RangeDelete) to a batch. Passing in an internal key of
     669             : // kind RangeDelete will result in a panic. Note that the seqnum in the internal
     670             : // key is effectively ignored, even though the Kind is preserved. This is
     671             : // because the batch format does not allow for a per-key seqnum to be specified,
     672             : // only a batch-wide one.
     673             : //
     674             : // Note that non-indexed keys (IngestKeyKind{LogData,IngestSST}) are not
     675             : // supported with this method as they require specialized logic.
     676           1 : func (b *Batch) AddInternalKey(key *base.InternalKey, value []byte, _ *WriteOptions) error {
     677           1 :         keyLen := len(key.UserKey)
     678           1 :         hasValue := false
     679           1 :         switch kind := key.Kind(); kind {
     680           0 :         case InternalKeyKindRangeDelete:
     681           0 :                 panic("unexpected range delete in AddInternalKey")
     682           0 :         case InternalKeyKindSingleDelete, InternalKeyKindDelete:
     683           0 :                 b.prepareDeferredKeyRecord(keyLen, kind)
     684           0 :                 b.deferredOp.index = b.index
     685           1 :         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     686           1 :                 b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
     687           1 :                 hasValue = true
     688           1 :                 b.incrementRangeKeysCount()
     689           0 :         default:
     690           0 :                 b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
     691           0 :                 hasValue = true
     692           0 :                 b.deferredOp.index = b.index
     693             :         }
     694           1 :         copy(b.deferredOp.Key, key.UserKey)
     695           1 :         if hasValue {
     696           1 :                 copy(b.deferredOp.Value, value)
     697           1 :         }
     698             : 
     699             :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     700             :         // in go1.13 will remove the need for this.
     701           1 :         if b.index != nil {
     702           0 :                 if err := b.index.Add(b.deferredOp.offset); err != nil {
     703           0 :                         return err
     704           0 :                 }
     705             :         }
     706           1 :         return nil
     707             : }
     708             : 
     709             : // Set adds an action to the batch that sets the key to map to the value.
     710             : //
     711             : // It is safe to modify the contents of the arguments after Set returns.
     712           1 : func (b *Batch) Set(key, value []byte, _ *WriteOptions) error {
     713           1 :         deferredOp := b.SetDeferred(len(key), len(value))
     714           1 :         copy(deferredOp.Key, key)
     715           1 :         copy(deferredOp.Value, value)
     716           1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     717           1 :         // in go1.13 will remove the need for this.
     718           1 :         if b.index != nil {
     719           1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     720           0 :                         return err
     721           0 :                 }
     722             :         }
     723           1 :         return nil
     724             : }
     725             : 
     726             : // SetDeferred is similar to Set in that it adds a set operation to the batch,
     727             : // except it only takes in key/value lengths instead of complete slices,
     728             : // letting the caller encode into those objects and then call Finish() on the
     729             : // returned object.
     730           1 : func (b *Batch) SetDeferred(keyLen, valueLen int) *DeferredBatchOp {
     731           1 :         b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindSet)
     732           1 :         b.deferredOp.index = b.index
     733           1 :         return &b.deferredOp
     734           1 : }
     735             : 
     736             : // Merge adds an action to the batch that merges the value at key with the new
     737             : // value. The details of the merge are dependent upon the configured merge
     738             : // operator.
     739             : //
     740             : // It is safe to modify the contents of the arguments after Merge returns.
     741           1 : func (b *Batch) Merge(key, value []byte, _ *WriteOptions) error {
     742           1 :         deferredOp := b.MergeDeferred(len(key), len(value))
     743           1 :         copy(deferredOp.Key, key)
     744           1 :         copy(deferredOp.Value, value)
     745           1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     746           1 :         // in go1.13 will remove the need for this.
     747           1 :         if b.index != nil {
     748           1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     749           0 :                         return err
     750           0 :                 }
     751             :         }
     752           1 :         return nil
     753             : }
     754             : 
     755             : // MergeDeferred is similar to Merge in that it adds a merge operation to the
     756             : // batch, except it only takes in key/value lengths instead of complete slices,
     757             : // letting the caller encode into those objects and then call Finish() on the
     758             : // returned object.
     759           1 : func (b *Batch) MergeDeferred(keyLen, valueLen int) *DeferredBatchOp {
     760           1 :         b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindMerge)
     761           1 :         b.deferredOp.index = b.index
     762           1 :         return &b.deferredOp
     763           1 : }
     764             : 
     765             : // Delete adds an action to the batch that deletes the entry for key.
     766             : //
     767             : // It is safe to modify the contents of the arguments after Delete returns.
     768           1 : func (b *Batch) Delete(key []byte, _ *WriteOptions) error {
     769           1 :         deferredOp := b.DeleteDeferred(len(key))
     770           1 :         copy(deferredOp.Key, key)
     771           1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     772           1 :         // in go1.13 will remove the need for this.
     773           1 :         if b.index != nil {
     774           1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     775           0 :                         return err
     776           0 :                 }
     777             :         }
     778           1 :         return nil
     779             : }
     780             : 
     781             : // DeleteDeferred is similar to Delete in that it adds a delete operation to
     782             : // the batch, except it only takes in key/value lengths instead of complete
     783             : // slices, letting the caller encode into those objects and then call Finish()
     784             : // on the returned object.
     785           1 : func (b *Batch) DeleteDeferred(keyLen int) *DeferredBatchOp {
     786           1 :         b.prepareDeferredKeyRecord(keyLen, InternalKeyKindDelete)
     787           1 :         b.deferredOp.index = b.index
     788           1 :         return &b.deferredOp
     789           1 : }
     790             : 
     791             : // DeleteSized behaves identically to Delete, but takes an additional
     792             : // argument indicating the size of the value being deleted. DeleteSized
     793             : // should be preferred when the caller has the expectation that there exists
     794             : // a single internal KV pair for the key (eg, the key has not been
     795             : // overwritten recently), and the caller knows the size of its value.
     796             : //
     797             : // DeleteSized will record the value size within the tombstone and use it to
     798             : // inform compaction-picking heuristics which strive to reduce space
     799             : // amplification in the LSM. This "calling your shot" mechanic allows the
     800             : // storage engine to more accurately estimate and reduce space amplification.
     801             : //
     802             : // It is safe to modify the contents of the arguments after DeleteSized
     803             : // returns.
     804           1 : func (b *Batch) DeleteSized(key []byte, deletedValueSize uint32, _ *WriteOptions) error {
     805           1 :         deferredOp := b.DeleteSizedDeferred(len(key), deletedValueSize)
     806           1 :         copy(b.deferredOp.Key, key)
     807           1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Check if in a
     808           1 :         // later Go release this is unnecessary.
     809           1 :         if b.index != nil {
     810           1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     811           0 :                         return err
     812           0 :                 }
     813             :         }
     814           1 :         return nil
     815             : }
     816             : 
     817             : // DeleteSizedDeferred is similar to DeleteSized in that it adds a sized delete
     818             : // operation to the batch, except it only takes in key length instead of a
     819             : // complete key slice, letting the caller encode into the DeferredBatchOp.Key
     820             : // slice and then call Finish() on the returned object.
     821           1 : func (b *Batch) DeleteSizedDeferred(keyLen int, deletedValueSize uint32) *DeferredBatchOp {
     822           1 :         if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
     823           1 :                 b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
     824           1 :         }
     825             : 
     826             :         // Encode the sum of the key length and the value in the value.
     827           1 :         v := uint64(deletedValueSize) + uint64(keyLen)
     828           1 : 
     829           1 :         // Encode `v` as a varint.
     830           1 :         var buf [binary.MaxVarintLen64]byte
     831           1 :         n := 0
     832           1 :         {
     833           1 :                 x := v
     834           1 :                 for x >= 0x80 {
     835           0 :                         buf[n] = byte(x) | 0x80
     836           0 :                         x >>= 7
     837           0 :                         n++
     838           0 :                 }
     839           1 :                 buf[n] = byte(x)
     840           1 :                 n++
     841             :         }
     842             : 
     843             :         // NB: In batch entries and sstable entries, values are stored as
     844             :         // varstrings. Here, the value is itself a simple varint. This results in an
     845             :         // unnecessary double layer of encoding:
     846             :         //     varint(n) varint(deletedValueSize)
     847             :         // The first varint will always be 1-byte, since a varint-encoded uint64
     848             :         // will never exceed 128 bytes. This unnecessary extra byte and wrapping is
     849             :         // preserved to avoid special casing across the database, and in particular
     850             :         // in sstable block decoding which is performance sensitive.
     851           1 :         b.prepareDeferredKeyValueRecord(keyLen, n, InternalKeyKindDeleteSized)
     852           1 :         b.deferredOp.index = b.index
     853           1 :         copy(b.deferredOp.Value, buf[:n])
     854           1 :         return &b.deferredOp
     855             : }
     856             : 
     857             : // SingleDelete adds an action to the batch that single deletes the entry for key.
     858             : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
     859             : //
     860             : // It is safe to modify the contents of the arguments after SingleDelete returns.
     861           1 : func (b *Batch) SingleDelete(key []byte, _ *WriteOptions) error {
     862           1 :         deferredOp := b.SingleDeleteDeferred(len(key))
     863           1 :         copy(deferredOp.Key, key)
     864           1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     865           1 :         // in go1.13 will remove the need for this.
     866           1 :         if b.index != nil {
     867           1 :                 if err := b.index.Add(deferredOp.offset); err != nil {
     868           0 :                         return err
     869           0 :                 }
     870             :         }
     871           1 :         return nil
     872             : }
     873             : 
     874             : // SingleDeleteDeferred is similar to SingleDelete in that it adds a single delete
     875             : // operation to the batch, except it only takes in key/value lengths instead of
     876             : // complete slices, letting the caller encode into those objects and then call
     877             : // Finish() on the returned object.
     878           1 : func (b *Batch) SingleDeleteDeferred(keyLen int) *DeferredBatchOp {
     879           1 :         b.prepareDeferredKeyRecord(keyLen, InternalKeyKindSingleDelete)
     880           1 :         b.deferredOp.index = b.index
     881           1 :         return &b.deferredOp
     882           1 : }
     883             : 
     884             : // DeleteRange deletes all of the point keys (and values) in the range
     885             : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
     886             : // delete overlapping range keys (eg, keys set via RangeKeySet).
     887             : //
     888             : // It is safe to modify the contents of the arguments after DeleteRange
     889             : // returns.
     890           1 : func (b *Batch) DeleteRange(start, end []byte, _ *WriteOptions) error {
     891           1 :         deferredOp := b.DeleteRangeDeferred(len(start), len(end))
     892           1 :         copy(deferredOp.Key, start)
     893           1 :         copy(deferredOp.Value, end)
     894           1 :         // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
     895           1 :         // in go1.13 will remove the need for this.
     896           1 :         if deferredOp.index != nil {
     897           1 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
     898           0 :                         return err
     899           0 :                 }
     900             :         }
     901           1 :         return nil
     902             : }
     903             : 
     904             : // DeleteRangeDeferred is similar to DeleteRange in that it adds a delete range
     905             : // operation to the batch, except it only takes in key lengths instead of
     906             : // complete slices, letting the caller encode into those objects and then call
     907             : // Finish() on the returned object. Note that DeferredBatchOp.Key should be
     908             : // populated with the start key, and DeferredBatchOp.Value should be populated
     909             : // with the end key.
     910           1 : func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp {
     911           1 :         b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeDelete)
     912           1 :         b.countRangeDels++
     913           1 :         if b.index != nil {
     914           1 :                 b.tombstones = nil
     915           1 :                 b.tombstonesSeqNum = 0
     916           1 :                 // Range deletions are rare, so we lazily allocate the index for them.
     917           1 :                 if b.rangeDelIndex == nil {
     918           1 :                         b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
     919           1 :                 }
     920           1 :                 b.deferredOp.index = b.rangeDelIndex
     921             :         }
     922           1 :         return &b.deferredOp
     923             : }
     924             : 
     925             : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
     926             : // timestamp suffix to value. The suffix is optional. If any portion of the key
     927             : // range [start, end) is already set by a range key with the same suffix value,
     928             : // RangeKeySet overrides it.
     929             : //
     930             : // It is safe to modify the contents of the arguments after RangeKeySet returns.
     931           1 : func (b *Batch) RangeKeySet(start, end, suffix, value []byte, _ *WriteOptions) error {
     932           1 :         if invariants.Enabled && b.db != nil && b.db.opts.Comparer.Split != nil {
     933           1 :                 // RangeKeySet is only supported on prefix keys.
     934           1 :                 if b.db.opts.Comparer.Split(start) != len(start) {
     935           0 :                         panic("RangeKeySet called with suffixed start key")
     936             :                 }
     937           1 :                 if b.db.opts.Comparer.Split(end) != len(end) {
     938           0 :                         panic("RangeKeySet called with suffixed end key")
     939             :                 }
     940             :         }
     941           1 :         suffixValues := [1]rangekey.SuffixValue{{Suffix: suffix, Value: value}}
     942           1 :         internalValueLen := rangekey.EncodedSetValueLen(end, suffixValues[:])
     943           1 : 
     944           1 :         deferredOp := b.rangeKeySetDeferred(len(start), internalValueLen)
     945           1 :         copy(deferredOp.Key, start)
     946           1 :         n := rangekey.EncodeSetValue(deferredOp.Value, end, suffixValues[:])
     947           1 :         if n != internalValueLen {
     948           0 :                 panic("unexpected internal value length mismatch")
     949             :         }
     950             : 
     951             :         // Manually inline DeferredBatchOp.Finish().
     952           1 :         if deferredOp.index != nil {
     953           1 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
     954           0 :                         return err
     955           0 :                 }
     956             :         }
     957           1 :         return nil
     958             : }
     959             : 
     960           1 : func (b *Batch) rangeKeySetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
     961           1 :         b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeySet)
     962           1 :         b.incrementRangeKeysCount()
     963           1 :         return &b.deferredOp
     964           1 : }
     965             : 
     966           1 : func (b *Batch) incrementRangeKeysCount() {
     967           1 :         b.countRangeKeys++
     968           1 :         if b.index != nil {
     969           1 :                 b.rangeKeys = nil
     970           1 :                 b.rangeKeysSeqNum = 0
     971           1 :                 // Range keys are rare, so we lazily allocate the index for them.
     972           1 :                 if b.rangeKeyIndex == nil {
     973           1 :                         b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
     974           1 :                 }
     975           1 :                 b.deferredOp.index = b.rangeKeyIndex
     976             :         }
     977             : }
     978             : 
     979             : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
     980             : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
     981             : // range key. RangeKeyUnset only removes portions of range keys that fall within
     982             : // the [start, end) key span, and only range keys with suffixes that exactly
     983             : // match the unset suffix.
     984             : //
     985             : // It is safe to modify the contents of the arguments after RangeKeyUnset
     986             : // returns.
     987           1 : func (b *Batch) RangeKeyUnset(start, end, suffix []byte, _ *WriteOptions) error {
     988           1 :         if invariants.Enabled && b.db != nil && b.db.opts.Comparer.Split != nil {
     989           1 :                 // RangeKeyUnset is only supported on prefix keys.
     990           1 :                 if b.db.opts.Comparer.Split(start) != len(start) {
     991           0 :                         panic("RangeKeyUnset called with suffixed start key")
     992             :                 }
     993           1 :                 if b.db.opts.Comparer.Split(end) != len(end) {
     994           0 :                         panic("RangeKeyUnset called with suffixed end key")
     995             :                 }
     996             :         }
     997           1 :         suffixes := [1][]byte{suffix}
     998           1 :         internalValueLen := rangekey.EncodedUnsetValueLen(end, suffixes[:])
     999           1 : 
    1000           1 :         deferredOp := b.rangeKeyUnsetDeferred(len(start), internalValueLen)
    1001           1 :         copy(deferredOp.Key, start)
    1002           1 :         n := rangekey.EncodeUnsetValue(deferredOp.Value, end, suffixes[:])
    1003           1 :         if n != internalValueLen {
    1004           0 :                 panic("unexpected internal value length mismatch")
    1005             :         }
    1006             : 
    1007             :         // Manually inline DeferredBatchOp.Finish()
    1008           1 :         if deferredOp.index != nil {
    1009           1 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
    1010           0 :                         return err
    1011           0 :                 }
    1012             :         }
    1013           1 :         return nil
    1014             : }
    1015             : 
    1016           1 : func (b *Batch) rangeKeyUnsetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
    1017           1 :         b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeyUnset)
    1018           1 :         b.incrementRangeKeysCount()
    1019           1 :         return &b.deferredOp
    1020           1 : }
    1021             : 
    1022             : // RangeKeyDelete deletes all of the range keys in the range [start,end)
    1023             : // (inclusive on start, exclusive on end). It does not delete point keys (for
    1024             : // that use DeleteRange). RangeKeyDelete removes all range keys within the
    1025             : // bounds, including those with or without suffixes.
    1026             : //
    1027             : // It is safe to modify the contents of the arguments after RangeKeyDelete
    1028             : // returns.
    1029           1 : func (b *Batch) RangeKeyDelete(start, end []byte, _ *WriteOptions) error {
    1030           1 :         if invariants.Enabled && b.db != nil && b.db.opts.Comparer.Split != nil {
    1031           1 :                 // RangeKeyDelete is only supported on prefix keys.
    1032           1 :                 if b.db.opts.Comparer.Split(start) != len(start) {
    1033           0 :                         panic("RangeKeyDelete called with suffixed start key")
    1034             :                 }
    1035           1 :                 if b.db.opts.Comparer.Split(end) != len(end) {
    1036           0 :                         panic("RangeKeyDelete called with suffixed end key")
    1037             :                 }
    1038             :         }
    1039           1 :         deferredOp := b.RangeKeyDeleteDeferred(len(start), len(end))
    1040           1 :         copy(deferredOp.Key, start)
    1041           1 :         copy(deferredOp.Value, end)
    1042           1 :         // Manually inline DeferredBatchOp.Finish().
    1043           1 :         if deferredOp.index != nil {
    1044           1 :                 if err := deferredOp.index.Add(deferredOp.offset); err != nil {
    1045           0 :                         return err
    1046           0 :                 }
    1047             :         }
    1048           1 :         return nil
    1049             : }
    1050             : 
    1051             : // RangeKeyDeleteDeferred is similar to RangeKeyDelete in that it adds an
    1052             : // operation to delete range keys to the batch, except it only takes in key
    1053             : // lengths instead of complete slices, letting the caller encode into those
    1054             : // objects and then call Finish() on the returned object. Note that
    1055             : // DeferredBatchOp.Key should be populated with the start key, and
    1056             : // DeferredBatchOp.Value should be populated with the end key.
    1057           1 : func (b *Batch) RangeKeyDeleteDeferred(startLen, endLen int) *DeferredBatchOp {
    1058           1 :         b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeKeyDelete)
    1059           1 :         b.incrementRangeKeysCount()
    1060           1 :         return &b.deferredOp
    1061           1 : }
    1062             : 
    1063             : // LogData adds the specified to the batch. The data will be written to the
    1064             : // WAL, but not added to memtables or sstables. Log data is never indexed,
    1065             : // which makes it useful for testing WAL performance.
    1066             : //
    1067             : // It is safe to modify the contents of the argument after LogData returns.
    1068           0 : func (b *Batch) LogData(data []byte, _ *WriteOptions) error {
    1069           0 :         origCount, origMemTableSize := b.count, b.memTableSize
    1070           0 :         b.prepareDeferredKeyRecord(len(data), InternalKeyKindLogData)
    1071           0 :         copy(b.deferredOp.Key, data)
    1072           0 :         // Since LogData only writes to the WAL and does not affect the memtable, we
    1073           0 :         // restore b.count and b.memTableSize to their origin values. Note that
    1074           0 :         // Batch.count only refers to records that are added to the memtable.
    1075           0 :         b.count, b.memTableSize = origCount, origMemTableSize
    1076           0 :         return nil
    1077           0 : }
    1078             : 
    1079             : // IngestSST adds the FileNum for an sstable to the batch. The data will only be
    1080             : // written to the WAL (not added to memtables or sstables).
    1081           1 : func (b *Batch) ingestSST(fileNum base.FileNum) {
    1082           1 :         if b.Empty() {
    1083           1 :                 b.ingestedSSTBatch = true
    1084           1 :         } else if !b.ingestedSSTBatch {
    1085           0 :                 // Batch contains other key kinds.
    1086           0 :                 panic("pebble: invalid call to ingestSST")
    1087             :         }
    1088             : 
    1089           1 :         origMemTableSize := b.memTableSize
    1090           1 :         var buf [binary.MaxVarintLen64]byte
    1091           1 :         length := binary.PutUvarint(buf[:], uint64(fileNum))
    1092           1 :         b.prepareDeferredKeyRecord(length, InternalKeyKindIngestSST)
    1093           1 :         copy(b.deferredOp.Key, buf[:length])
    1094           1 :         // Since IngestSST writes only to the WAL and does not affect the memtable,
    1095           1 :         // we restore b.memTableSize to its original value. Note that Batch.count
    1096           1 :         // is not reset because for the InternalKeyKindIngestSST the count is the
    1097           1 :         // number of sstable paths which have been added to the batch.
    1098           1 :         b.memTableSize = origMemTableSize
    1099           1 :         b.minimumFormatMajorVersion = FormatFlushableIngest
    1100             : }
    1101             : 
    1102             : // Empty returns true if the batch is empty, and false otherwise.
    1103           1 : func (b *Batch) Empty() bool {
    1104           1 :         return len(b.data) <= batchHeaderLen
    1105           1 : }
    1106             : 
    1107             : // Len returns the current size of the batch in bytes.
    1108           0 : func (b *Batch) Len() int {
    1109           0 :         if len(b.data) <= batchHeaderLen {
    1110           0 :                 return batchHeaderLen
    1111           0 :         }
    1112           0 :         return len(b.data)
    1113             : }
    1114             : 
    1115             : // Repr returns the underlying batch representation. It is not safe to modify
    1116             : // the contents. Reset() will not change the contents of the returned value,
    1117             : // though any other mutation operation may do so.
    1118           1 : func (b *Batch) Repr() []byte {
    1119           1 :         if len(b.data) == 0 {
    1120           0 :                 b.init(batchHeaderLen)
    1121           0 :         }
    1122           1 :         binary.LittleEndian.PutUint32(b.countData(), b.Count())
    1123           1 :         return b.data
    1124             : }
    1125             : 
    1126             : // SetRepr sets the underlying batch representation. The batch takes ownership
    1127             : // of the supplied slice. It is not safe to modify it afterwards until the
    1128             : // Batch is no longer in use.
    1129           1 : func (b *Batch) SetRepr(data []byte) error {
    1130           1 :         if len(data) < batchHeaderLen {
    1131           0 :                 return base.CorruptionErrorf("invalid batch")
    1132           0 :         }
    1133           1 :         b.data = data
    1134           1 :         b.count = uint64(binary.LittleEndian.Uint32(b.countData()))
    1135           1 :         var err error
    1136           1 :         if b.db != nil {
    1137           1 :                 // Only track memTableSize for batches that will be committed to the DB.
    1138           1 :                 err = b.refreshMemTableSize()
    1139           1 :         }
    1140           1 :         return err
    1141             : }
    1142             : 
    1143             : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
    1144             : // return false). The iterator can be positioned via a call to SeekGE,
    1145             : // SeekPrefixGE, SeekLT, First or Last. Only indexed batches support iterators.
    1146             : //
    1147             : // The returned Iterator observes all of the Batch's existing mutations, but no
    1148             : // later mutations. Its view can be refreshed via RefreshBatchSnapshot or
    1149             : // SetOptions().
    1150           1 : func (b *Batch) NewIter(o *IterOptions) (*Iterator, error) {
    1151           1 :         return b.NewIterWithContext(context.Background(), o)
    1152           1 : }
    1153             : 
    1154             : // NewIterWithContext is like NewIter, and additionally accepts a context for
    1155             : // tracing.
    1156           1 : func (b *Batch) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
    1157           1 :         if b.index == nil {
    1158           0 :                 return nil, ErrNotIndexed
    1159           0 :         }
    1160           1 :         return b.db.newIter(ctx, b, newIterOpts{}, o), nil
    1161             : }
    1162             : 
    1163             : // NewBatchOnlyIter constructs an iterator that only reads the contents of the
    1164             : // batch, and does not overlay the batch mutations on top of the DB state.
    1165             : //
    1166             : // The returned Iterator observes all of the Batch's existing mutations, but
    1167             : // no later mutations. Its view can be refreshed via RefreshBatchSnapshot or
    1168             : // SetOptions().
    1169           0 : func (b *Batch) NewBatchOnlyIter(ctx context.Context, o *IterOptions) (*Iterator, error) {
    1170           0 :         if b.index == nil {
    1171           0 :                 return nil, ErrNotIndexed
    1172           0 :         }
    1173           0 :         return b.db.newIter(ctx, b, newIterOpts{batch: batchIterOpts{batchOnly: true}}, o), nil
    1174             : }
    1175             : 
    1176             : // newInternalIter creates a new internalIterator that iterates over the
    1177             : // contents of the batch.
    1178           1 : func (b *Batch) newInternalIter(o *IterOptions) *batchIter {
    1179           1 :         iter := &batchIter{}
    1180           1 :         b.initInternalIter(o, iter)
    1181           1 :         return iter
    1182           1 : }
    1183             : 
    1184           1 : func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) {
    1185           1 :         *iter = batchIter{
    1186           1 :                 cmp:   b.cmp,
    1187           1 :                 batch: b,
    1188           1 :                 iter:  b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()),
    1189           1 :                 // NB: We explicitly do not propagate the batch snapshot to the point
    1190           1 :                 // key iterator. Filtering point keys within the batch iterator can
    1191           1 :                 // cause pathological behavior where a batch iterator advances
    1192           1 :                 // significantly farther than necessary filtering many batch keys that
    1193           1 :                 // are not visible at the batch sequence number. Instead, the merging
    1194           1 :                 // iterator enforces bounds.
    1195           1 :                 //
    1196           1 :                 // For example, consider an engine that contains the committed keys
    1197           1 :                 // 'bar' and 'bax', with no keys between them. Consider a batch
    1198           1 :                 // containing keys 1,000 keys within the range [a,z]. All of the
    1199           1 :                 // batch keys were added to the batch after the iterator was
    1200           1 :                 // constructed, so they are not visible to the iterator. A call to
    1201           1 :                 // SeekGE('bax') would seek the LSM iterators and discover the key
    1202           1 :                 // 'bax'. It would also seek the batch iterator, landing on the key
    1203           1 :                 // 'baz' but discover it that it's not visible. The batch iterator would
    1204           1 :                 // next through the rest of the batch's keys, only to discover there are
    1205           1 :                 // no visible keys greater than or equal to 'bax'.
    1206           1 :                 //
    1207           1 :                 // Filtering these batch points within the merging iterator ensures that
    1208           1 :                 // the batch iterator never needs to iterate beyond 'baz', because it
    1209           1 :                 // already found a smaller, visible key 'bax'.
    1210           1 :                 snapshot: base.InternalKeySeqNumMax,
    1211           1 :         }
    1212           1 : }
    1213             : 
    1214           1 : func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot uint64) *keyspan.Iter {
    1215           1 :         // Construct an iterator even if rangeDelIndex is nil, because it is allowed
    1216           1 :         // to refresh later, so we need the container to exist.
    1217           1 :         iter := new(keyspan.Iter)
    1218           1 :         b.initRangeDelIter(o, iter, batchSnapshot)
    1219           1 :         return iter
    1220           1 : }
    1221             : 
    1222           1 : func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot uint64) {
    1223           1 :         if b.rangeDelIndex == nil {
    1224           1 :                 iter.Init(b.cmp, nil)
    1225           1 :                 return
    1226           1 :         }
    1227             : 
    1228             :         // Fragment the range tombstones the first time a range deletion iterator is
    1229             :         // requested. The cached tombstones are invalidated if another range
    1230             :         // deletion tombstone is added to the batch. This cache is only guaranteed
    1231             :         // to be correct if we're opening an iterator to read at a batch sequence
    1232             :         // number at least as high as tombstonesSeqNum. The cache is guaranteed to
    1233             :         // include all tombstones up to tombstonesSeqNum, and if any additional
    1234             :         // tombstones were added after that sequence number the cache would've been
    1235             :         // cleared.
    1236           1 :         nextSeqNum := b.nextSeqNum()
    1237           1 :         if b.tombstones != nil && b.tombstonesSeqNum <= batchSnapshot {
    1238           1 :                 iter.Init(b.cmp, b.tombstones)
    1239           1 :                 return
    1240           1 :         }
    1241             : 
    1242           1 :         tombstones := make([]keyspan.Span, 0, b.countRangeDels)
    1243           1 :         frag := &keyspan.Fragmenter{
    1244           1 :                 Cmp:    b.cmp,
    1245           1 :                 Format: b.formatKey,
    1246           1 :                 Emit: func(s keyspan.Span) {
    1247           1 :                         tombstones = append(tombstones, s)
    1248           1 :                 },
    1249             :         }
    1250           1 :         it := &batchIter{
    1251           1 :                 cmp:      b.cmp,
    1252           1 :                 batch:    b,
    1253           1 :                 iter:     b.rangeDelIndex.NewIter(nil, nil),
    1254           1 :                 snapshot: batchSnapshot,
    1255           1 :         }
    1256           1 :         fragmentRangeDels(frag, it, int(b.countRangeDels))
    1257           1 :         iter.Init(b.cmp, tombstones)
    1258           1 : 
    1259           1 :         // If we just read all the tombstones in the batch (eg, batchSnapshot was
    1260           1 :         // set to b.nextSeqNum()), then cache the tombstones so that a subsequent
    1261           1 :         // call to initRangeDelIter may use them without refragmenting.
    1262           1 :         if nextSeqNum == batchSnapshot {
    1263           1 :                 b.tombstones = tombstones
    1264           1 :                 b.tombstonesSeqNum = nextSeqNum
    1265           1 :         }
    1266             : }
    1267             : 
    1268           1 : func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int) {
    1269           1 :         // The memory management here is a bit subtle. The keys and values returned
    1270           1 :         // by the iterator are slices in Batch.data. Thus the fragmented tombstones
    1271           1 :         // are slices within Batch.data. If additional entries are added to the
    1272           1 :         // Batch, Batch.data may be reallocated. The references in the fragmented
    1273           1 :         // tombstones will remain valid, pointing into the old Batch.data. GC for
    1274           1 :         // the win.
    1275           1 : 
    1276           1 :         // Use a single []keyspan.Key buffer to avoid allocating many
    1277           1 :         // individual []keyspan.Key slices with a single element each.
    1278           1 :         keyBuf := make([]keyspan.Key, 0, count)
    1279           1 :         for key, val := it.First(); key != nil; key, val = it.Next() {
    1280           1 :                 s := rangedel.Decode(*key, val.InPlaceValue(), keyBuf)
    1281           1 :                 keyBuf = s.Keys[len(s.Keys):]
    1282           1 : 
    1283           1 :                 // Set a fixed capacity to avoid accidental overwriting.
    1284           1 :                 s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
    1285           1 :                 frag.Add(s)
    1286           1 :         }
    1287           1 :         frag.Finish()
    1288             : }
    1289             : 
    1290           1 : func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot uint64) *keyspan.Iter {
    1291           1 :         // Construct an iterator even if rangeKeyIndex is nil, because it is allowed
    1292           1 :         // to refresh later, so we need the container to exist.
    1293           1 :         iter := new(keyspan.Iter)
    1294           1 :         b.initRangeKeyIter(o, iter, batchSnapshot)
    1295           1 :         return iter
    1296           1 : }
    1297             : 
    1298           1 : func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot uint64) {
    1299           1 :         if b.rangeKeyIndex == nil {
    1300           1 :                 iter.Init(b.cmp, nil)
    1301           1 :                 return
    1302           1 :         }
    1303             : 
    1304             :         // Fragment the range keys the first time a range key iterator is requested.
    1305             :         // The cached spans are invalidated if another range key is added to the
    1306             :         // batch. This cache is only guaranteed to be correct if we're opening an
    1307             :         // iterator to read at a batch sequence number at least as high as
    1308             :         // rangeKeysSeqNum. The cache is guaranteed to include all range keys up to
    1309             :         // rangeKeysSeqNum, and if any additional range keys were added after that
    1310             :         // sequence number the cache would've been cleared.
    1311           1 :         nextSeqNum := b.nextSeqNum()
    1312           1 :         if b.rangeKeys != nil && b.rangeKeysSeqNum <= batchSnapshot {
    1313           1 :                 iter.Init(b.cmp, b.rangeKeys)
    1314           1 :                 return
    1315           1 :         }
    1316             : 
    1317           1 :         rangeKeys := make([]keyspan.Span, 0, b.countRangeKeys)
    1318           1 :         frag := &keyspan.Fragmenter{
    1319           1 :                 Cmp:    b.cmp,
    1320           1 :                 Format: b.formatKey,
    1321           1 :                 Emit: func(s keyspan.Span) {
    1322           1 :                         rangeKeys = append(rangeKeys, s)
    1323           1 :                 },
    1324             :         }
    1325           1 :         it := &batchIter{
    1326           1 :                 cmp:      b.cmp,
    1327           1 :                 batch:    b,
    1328           1 :                 iter:     b.rangeKeyIndex.NewIter(nil, nil),
    1329           1 :                 snapshot: batchSnapshot,
    1330           1 :         }
    1331           1 :         fragmentRangeKeys(frag, it, int(b.countRangeKeys))
    1332           1 :         iter.Init(b.cmp, rangeKeys)
    1333           1 : 
    1334           1 :         // If we just read all the range keys in the batch (eg, batchSnapshot was
    1335           1 :         // set to b.nextSeqNum()), then cache the range keys so that a subsequent
    1336           1 :         // call to initRangeKeyIter may use them without refragmenting.
    1337           1 :         if nextSeqNum == batchSnapshot {
    1338           1 :                 b.rangeKeys = rangeKeys
    1339           1 :                 b.rangeKeysSeqNum = nextSeqNum
    1340           1 :         }
    1341             : }
    1342             : 
    1343           1 : func fragmentRangeKeys(frag *keyspan.Fragmenter, it internalIterator, count int) error {
    1344           1 :         // The memory management here is a bit subtle. The keys and values
    1345           1 :         // returned by the iterator are slices in Batch.data. Thus the
    1346           1 :         // fragmented key spans are slices within Batch.data. If additional
    1347           1 :         // entries are added to the Batch, Batch.data may be reallocated. The
    1348           1 :         // references in the fragmented keys will remain valid, pointing into
    1349           1 :         // the old Batch.data. GC for the win.
    1350           1 : 
    1351           1 :         // Use a single []keyspan.Key buffer to avoid allocating many
    1352           1 :         // individual []keyspan.Key slices with a single element each.
    1353           1 :         keyBuf := make([]keyspan.Key, 0, count)
    1354           1 :         for ik, val := it.First(); ik != nil; ik, val = it.Next() {
    1355           1 :                 s, err := rangekey.Decode(*ik, val.InPlaceValue(), keyBuf)
    1356           1 :                 if err != nil {
    1357           0 :                         return err
    1358           0 :                 }
    1359           1 :                 keyBuf = s.Keys[len(s.Keys):]
    1360           1 : 
    1361           1 :                 // Set a fixed capacity to avoid accidental overwriting.
    1362           1 :                 s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
    1363           1 :                 frag.Add(s)
    1364             :         }
    1365           1 :         frag.Finish()
    1366           1 :         return nil
    1367             : }
    1368             : 
    1369             : // Commit applies the batch to its parent writer.
    1370           1 : func (b *Batch) Commit(o *WriteOptions) error {
    1371           1 :         return b.db.Apply(b, o)
    1372           1 : }
    1373             : 
    1374             : // Close closes the batch without committing it.
    1375           1 : func (b *Batch) Close() error {
    1376           1 :         b.release()
    1377           1 :         return nil
    1378           1 : }
    1379             : 
    1380             : // Indexed returns true if the batch is indexed (i.e. supports read
    1381             : // operations).
    1382           1 : func (b *Batch) Indexed() bool {
    1383           1 :         return b.index != nil
    1384           1 : }
    1385             : 
    1386             : // init ensures that the batch data slice is initialized to meet the
    1387             : // minimum required size and allocates space for the batch header.
    1388           1 : func (b *Batch) init(size int) {
    1389           1 :         n := batchInitialSize
    1390           1 :         for n < size {
    1391           0 :                 n *= 2
    1392           0 :         }
    1393           1 :         if cap(b.data) < n {
    1394           1 :                 b.data = rawalloc.New(batchHeaderLen, n)
    1395           1 :         }
    1396           1 :         b.data = b.data[:batchHeaderLen]
    1397           1 :         clear(b.data) // Zero the sequence number in the header
    1398             : }
    1399             : 
    1400             : // Reset resets the batch for reuse. The underlying byte slice (that is
    1401             : // returned by Repr()) may not be modified. It is only necessary to call this
    1402             : // method if a batch is explicitly being reused. Close automatically takes are
    1403             : // of releasing resources when appropriate for batches that are internally
    1404             : // being reused.
    1405           1 : func (b *Batch) Reset() {
    1406           1 :         // Zero out the struct, retaining only the fields necessary for manual
    1407           1 :         // reuse.
    1408           1 :         b.batchInternal = batchInternal{
    1409           1 :                 data:           b.data,
    1410           1 :                 cmp:            b.cmp,
    1411           1 :                 formatKey:      b.formatKey,
    1412           1 :                 abbreviatedKey: b.abbreviatedKey,
    1413           1 :                 index:          b.index,
    1414           1 :                 db:             b.db,
    1415           1 :         }
    1416           1 :         b.applied.Store(false)
    1417           1 :         if b.data != nil {
    1418           1 :                 if cap(b.data) > batchMaxRetainedSize {
    1419           0 :                         // If the capacity of the buffer is larger than our maximum
    1420           0 :                         // retention size, don't re-use it. Let it be GC-ed instead.
    1421           0 :                         // This prevents the memory from an unusually large batch from
    1422           0 :                         // being held on to indefinitely.
    1423           0 :                         b.data = nil
    1424           1 :                 } else {
    1425           1 :                         // Otherwise, reset the buffer for re-use.
    1426           1 :                         b.data = b.data[:batchHeaderLen]
    1427           1 :                         clear(b.data)
    1428           1 :                 }
    1429             :         }
    1430           1 :         if b.index != nil {
    1431           1 :                 b.index.Init(&b.data, b.cmp, b.abbreviatedKey)
    1432           1 :         }
    1433             : }
    1434             : 
    1435             : // seqNumData returns the 8 byte little-endian sequence number. Zero means that
    1436             : // the batch has not yet been applied.
    1437           1 : func (b *Batch) seqNumData() []byte {
    1438           1 :         return b.data[:8]
    1439           1 : }
    1440             : 
    1441             : // countData returns the 4 byte little-endian count data. "\xff\xff\xff\xff"
    1442             : // means that the batch is invalid.
    1443           1 : func (b *Batch) countData() []byte {
    1444           1 :         return b.data[8:12]
    1445           1 : }
    1446             : 
    1447           1 : func (b *Batch) grow(n int) {
    1448           1 :         newSize := len(b.data) + n
    1449           1 :         if uint64(newSize) >= maxBatchSize {
    1450           0 :                 panic(ErrBatchTooLarge)
    1451             :         }
    1452           1 :         if newSize > cap(b.data) {
    1453           0 :                 newCap := 2 * cap(b.data)
    1454           0 :                 for newCap < newSize {
    1455           0 :                         newCap *= 2
    1456           0 :                 }
    1457           0 :                 newData := rawalloc.New(len(b.data), newCap)
    1458           0 :                 copy(newData, b.data)
    1459           0 :                 b.data = newData
    1460             :         }
    1461           1 :         b.data = b.data[:newSize]
    1462             : }
    1463             : 
    1464           1 : func (b *Batch) setSeqNum(seqNum uint64) {
    1465           1 :         binary.LittleEndian.PutUint64(b.seqNumData(), seqNum)
    1466           1 : }
    1467             : 
    1468             : // SeqNum returns the batch sequence number which is applied to the first
    1469             : // record in the batch. The sequence number is incremented for each subsequent
    1470             : // record. It returns zero if the batch is empty.
    1471           1 : func (b *Batch) SeqNum() uint64 {
    1472           1 :         if len(b.data) == 0 {
    1473           0 :                 b.init(batchHeaderLen)
    1474           0 :         }
    1475           1 :         return binary.LittleEndian.Uint64(b.seqNumData())
    1476             : }
    1477             : 
    1478           1 : func (b *Batch) setCount(v uint32) {
    1479           1 :         b.count = uint64(v)
    1480           1 : }
    1481             : 
    1482             : // Count returns the count of memtable-modifying operations in this batch. All
    1483             : // operations with the except of LogData increment this count. For IngestSSTs,
    1484             : // count is only used to indicate the number of SSTs ingested in the record, the
    1485             : // batch isn't applied to the memtable.
    1486           1 : func (b *Batch) Count() uint32 {
    1487           1 :         if b.count > math.MaxUint32 {
    1488           0 :                 panic(ErrInvalidBatch)
    1489             :         }
    1490           1 :         return uint32(b.count)
    1491             : }
    1492             : 
    1493             : // Reader returns a BatchReader for the current batch contents. If the batch is
    1494             : // mutated, the new entries will not be visible to the reader.
    1495           1 : func (b *Batch) Reader() BatchReader {
    1496           1 :         if len(b.data) == 0 {
    1497           1 :                 b.init(batchHeaderLen)
    1498           1 :         }
    1499           1 :         return b.data[batchHeaderLen:]
    1500             : }
    1501             : 
    1502           1 : func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) {
    1503           1 :         // TODO(jackson): This will index out of bounds if there's no varint or an
    1504           1 :         // invalid varint (eg, a single 0xff byte). Correcting will add a bit of
    1505           1 :         // overhead. We could avoid that overhead whenever len(data) >=
    1506           1 :         // binary.MaxVarint32?
    1507           1 : 
    1508           1 :         var v uint32
    1509           1 :         var n int
    1510           1 :         ptr := unsafe.Pointer(&data[0])
    1511           1 :         if a := *((*uint8)(ptr)); a < 128 {
    1512           1 :                 v = uint32(a)
    1513           1 :                 n = 1
    1514           1 :         } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
    1515           0 :                 v = uint32(b)<<7 | uint32(a)
    1516           0 :                 n = 2
    1517           0 :         } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
    1518           0 :                 v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
    1519           0 :                 n = 3
    1520           0 :         } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
    1521           0 :                 v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
    1522           0 :                 n = 4
    1523           0 :         } else {
    1524           0 :                 d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
    1525           0 :                 v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
    1526           0 :                 n = 5
    1527           0 :         }
    1528             : 
    1529           1 :         data = data[n:]
    1530           1 :         if v > uint32(len(data)) {
    1531           0 :                 return nil, nil, false
    1532           0 :         }
    1533           1 :         return data[v:], data[:v], true
    1534             : }
    1535             : 
    1536             : // SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
    1537           1 : func (b *Batch) SyncWait() error {
    1538           1 :         now := time.Now()
    1539           1 :         b.fsyncWait.Wait()
    1540           1 :         if b.commitErr != nil {
    1541           0 :                 b.db = nil // prevent batch reuse on error
    1542           0 :         }
    1543           1 :         waitDuration := time.Since(now)
    1544           1 :         b.commitStats.CommitWaitDuration += waitDuration
    1545           1 :         b.commitStats.TotalDuration += waitDuration
    1546           1 :         return b.commitErr
    1547             : }
    1548             : 
    1549             : // CommitStats returns stats related to committing the batch. Should be called
    1550             : // after Batch.Commit, DB.Apply. If DB.ApplyNoSyncWait is used, should be
    1551             : // called after Batch.SyncWait.
    1552           0 : func (b *Batch) CommitStats() BatchCommitStats {
    1553           0 :         return b.commitStats
    1554           0 : }
    1555             : 
    1556             : // BatchReader iterates over the entries contained in a batch.
    1557             : type BatchReader []byte
    1558             : 
    1559             : // ReadBatch constructs a BatchReader from a batch representation.  The
    1560             : // header is not validated. ReadBatch returns a new batch reader and the
    1561             : // count of entries contained within the batch.
    1562           0 : func ReadBatch(repr []byte) (r BatchReader, count uint32) {
    1563           0 :         if len(repr) <= batchHeaderLen {
    1564           0 :                 return nil, count
    1565           0 :         }
    1566           0 :         count = binary.LittleEndian.Uint32(repr[batchCountOffset:batchHeaderLen])
    1567           0 :         return repr[batchHeaderLen:], count
    1568             : }
    1569             : 
    1570             : // Next returns the next entry in this batch, if there is one. If the reader has
    1571             : // reached the end of the batch, Next returns ok=false and a nil error. If the
    1572             : // batch is corrupt and the next entry is illegible, Next returns ok=false and a
    1573             : // non-nil error.
    1574           1 : func (r *BatchReader) Next() (kind InternalKeyKind, ukey []byte, value []byte, ok bool, err error) {
    1575           1 :         if len(*r) == 0 {
    1576           1 :                 return 0, nil, nil, false, nil
    1577           1 :         }
    1578           1 :         kind = InternalKeyKind((*r)[0])
    1579           1 :         if kind > InternalKeyKindMax {
    1580           0 :                 return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0])
    1581           0 :         }
    1582           1 :         *r, ukey, ok = batchDecodeStr((*r)[1:])
    1583           1 :         if !ok {
    1584           0 :                 return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key")
    1585           0 :         }
    1586           1 :         switch kind {
    1587             :         case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
    1588             :                 InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
    1589           1 :                 InternalKeyKindDeleteSized:
    1590           1 :                 *r, value, ok = batchDecodeStr(*r)
    1591           1 :                 if !ok {
    1592           0 :                         return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind)
    1593           0 :                 }
    1594             :         }
    1595           1 :         return kind, ukey, value, true, nil
    1596             : }
    1597             : 
    1598             : // Note: batchIter mirrors the implementation of flushableBatchIter. Keep the
    1599             : // two in sync.
    1600             : type batchIter struct {
    1601             :         cmp   Compare
    1602             :         batch *Batch
    1603             :         iter  batchskl.Iterator
    1604             :         err   error
    1605             :         // snapshot holds a batch "sequence number" at which the batch is being
    1606             :         // read. This sequence number has the InternalKeySeqNumBatch bit set, so it
    1607             :         // encodes an offset within the batch. Only batch entries earlier than the
    1608             :         // offset are visible during iteration.
    1609             :         snapshot uint64
    1610             : }
    1611             : 
    1612             : // batchIter implements the base.InternalIterator interface.
    1613             : var _ base.InternalIterator = (*batchIter)(nil)
    1614             : 
    1615           0 : func (i *batchIter) String() string {
    1616           0 :         return "batch"
    1617           0 : }
    1618             : 
    1619           1 : func (i *batchIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, base.LazyValue) {
    1620           1 :         // Ignore TrySeekUsingNext if the view of the batch changed.
    1621           1 :         if flags.TrySeekUsingNext() && flags.BatchJustRefreshed() {
    1622           0 :                 flags = flags.DisableTrySeekUsingNext()
    1623           0 :         }
    1624             : 
    1625           1 :         i.err = nil // clear cached iteration error
    1626           1 :         ikey := i.iter.SeekGE(key, flags)
    1627           1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1628           0 :                 ikey = i.iter.Next()
    1629           0 :         }
    1630           1 :         if ikey == nil {
    1631           1 :                 return nil, base.LazyValue{}
    1632           1 :         }
    1633           1 :         return ikey, base.MakeInPlaceValue(i.value())
    1634             : }
    1635             : 
    1636             : func (i *batchIter) SeekPrefixGE(
    1637             :         prefix, key []byte, flags base.SeekGEFlags,
    1638           1 : ) (*base.InternalKey, base.LazyValue) {
    1639           1 :         i.err = nil // clear cached iteration error
    1640           1 :         return i.SeekGE(key, flags)
    1641           1 : }
    1642             : 
    1643           1 : func (i *batchIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, base.LazyValue) {
    1644           1 :         i.err = nil // clear cached iteration error
    1645           1 :         ikey := i.iter.SeekLT(key)
    1646           1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1647           0 :                 ikey = i.iter.Prev()
    1648           0 :         }
    1649           1 :         if ikey == nil {
    1650           1 :                 return nil, base.LazyValue{}
    1651           1 :         }
    1652           1 :         return ikey, base.MakeInPlaceValue(i.value())
    1653             : }
    1654             : 
    1655           1 : func (i *batchIter) First() (*InternalKey, base.LazyValue) {
    1656           1 :         i.err = nil // clear cached iteration error
    1657           1 :         ikey := i.iter.First()
    1658           1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1659           0 :                 ikey = i.iter.Next()
    1660           0 :         }
    1661           1 :         if ikey == nil {
    1662           1 :                 return nil, base.LazyValue{}
    1663           1 :         }
    1664           1 :         return ikey, base.MakeInPlaceValue(i.value())
    1665             : }
    1666             : 
    1667           1 : func (i *batchIter) Last() (*InternalKey, base.LazyValue) {
    1668           1 :         i.err = nil // clear cached iteration error
    1669           1 :         ikey := i.iter.Last()
    1670           1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1671           0 :                 ikey = i.iter.Prev()
    1672           0 :         }
    1673           1 :         if ikey == nil {
    1674           1 :                 return nil, base.LazyValue{}
    1675           1 :         }
    1676           1 :         return ikey, base.MakeInPlaceValue(i.value())
    1677             : }
    1678             : 
    1679           1 : func (i *batchIter) Next() (*InternalKey, base.LazyValue) {
    1680           1 :         ikey := i.iter.Next()
    1681           1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1682           0 :                 ikey = i.iter.Next()
    1683           0 :         }
    1684           1 :         if ikey == nil {
    1685           1 :                 return nil, base.LazyValue{}
    1686           1 :         }
    1687           1 :         return ikey, base.MakeInPlaceValue(i.value())
    1688             : }
    1689             : 
    1690           0 : func (i *batchIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
    1691           0 :         // Because NextPrefix was invoked `succKey` must be ≥ the key at i's current
    1692           0 :         // position. Seek the arena iterator using TrySeekUsingNext.
    1693           0 :         ikey := i.iter.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
    1694           0 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1695           0 :                 ikey = i.iter.Next()
    1696           0 :         }
    1697           0 :         if ikey == nil {
    1698           0 :                 return nil, base.LazyValue{}
    1699           0 :         }
    1700           0 :         return ikey, base.MakeInPlaceValue(i.value())
    1701             : }
    1702             : 
    1703           1 : func (i *batchIter) Prev() (*InternalKey, base.LazyValue) {
    1704           1 :         ikey := i.iter.Prev()
    1705           1 :         for ikey != nil && ikey.SeqNum() >= i.snapshot {
    1706           0 :                 ikey = i.iter.Prev()
    1707           0 :         }
    1708           1 :         if ikey == nil {
    1709           1 :                 return nil, base.LazyValue{}
    1710           1 :         }
    1711           1 :         return ikey, base.MakeInPlaceValue(i.value())
    1712             : }
    1713             : 
    1714           1 : func (i *batchIter) value() []byte {
    1715           1 :         offset, _, keyEnd := i.iter.KeyInfo()
    1716           1 :         data := i.batch.data
    1717           1 :         if len(data[offset:]) == 0 {
    1718           0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    1719           0 :                 return nil
    1720           0 :         }
    1721             : 
    1722           1 :         switch InternalKeyKind(data[offset]) {
    1723             :         case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
    1724             :                 InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
    1725           1 :                 InternalKeyKindDeleteSized:
    1726           1 :                 _, value, ok := batchDecodeStr(data[keyEnd:])
    1727           1 :                 if !ok {
    1728           0 :                         return nil
    1729           0 :                 }
    1730           1 :                 return value
    1731           1 :         default:
    1732           1 :                 return nil
    1733             :         }
    1734             : }
    1735             : 
    1736           1 : func (i *batchIter) Error() error {
    1737           1 :         return i.err
    1738           1 : }
    1739             : 
    1740           1 : func (i *batchIter) Close() error {
    1741           1 :         _ = i.iter.Close()
    1742           1 :         return i.err
    1743           1 : }
    1744             : 
    1745           1 : func (i *batchIter) SetBounds(lower, upper []byte) {
    1746           1 :         i.iter.SetBounds(lower, upper)
    1747           1 : }
    1748             : 
    1749           0 : func (i *batchIter) SetContext(_ context.Context) {}
    1750             : 
    1751             : type flushableBatchEntry struct {
    1752             :         // offset is the byte offset of the record within the batch repr.
    1753             :         offset uint32
    1754             :         // index is the 0-based ordinal number of the record within the batch. Used
    1755             :         // to compute the seqnum for the record.
    1756             :         index uint32
    1757             :         // key{Start,End} are the start and end byte offsets of the key within the
    1758             :         // batch repr. Cached to avoid decoding the key length on every
    1759             :         // comparison. The value is stored starting at keyEnd.
    1760             :         keyStart uint32
    1761             :         keyEnd   uint32
    1762             : }
    1763             : 
    1764             : // flushableBatch wraps an existing batch and provides the interfaces needed
    1765             : // for making the batch flushable (i.e. able to mimic a memtable).
    1766             : type flushableBatch struct {
    1767             :         cmp       Compare
    1768             :         formatKey base.FormatKey
    1769             :         data      []byte
    1770             : 
    1771             :         // The base sequence number for the entries in the batch. This is the same
    1772             :         // value as Batch.seqNum() and is cached here for performance.
    1773             :         seqNum uint64
    1774             : 
    1775             :         // A slice of offsets and indices for the entries in the batch. Used to
    1776             :         // implement flushableBatchIter. Unlike the indexing on a normal batch, a
    1777             :         // flushable batch is indexed such that batch entry i will be given the
    1778             :         // sequence number flushableBatch.seqNum+i.
    1779             :         //
    1780             :         // Sorted in increasing order of key and decreasing order of offset (since
    1781             :         // higher offsets correspond to higher sequence numbers).
    1782             :         //
    1783             :         // Does not include range deletion entries or range key entries.
    1784             :         offsets []flushableBatchEntry
    1785             : 
    1786             :         // Fragmented range deletion tombstones.
    1787             :         tombstones []keyspan.Span
    1788             : 
    1789             :         // Fragmented range keys.
    1790             :         rangeKeys []keyspan.Span
    1791             : }
    1792             : 
    1793             : var _ flushable = (*flushableBatch)(nil)
    1794             : 
    1795             : // newFlushableBatch creates a new batch that implements the flushable
    1796             : // interface. This allows the batch to act like a memtable and be placed in the
    1797             : // queue of flushable memtables. Note that the flushable batch takes ownership
    1798             : // of the batch data.
    1799           1 : func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error) {
    1800           1 :         b := &flushableBatch{
    1801           1 :                 data:      batch.data,
    1802           1 :                 cmp:       comparer.Compare,
    1803           1 :                 formatKey: comparer.FormatKey,
    1804           1 :                 offsets:   make([]flushableBatchEntry, 0, batch.Count()),
    1805           1 :         }
    1806           1 :         if b.data != nil {
    1807           1 :                 // Note that this sequence number is not correct when this batch has not
    1808           1 :                 // been applied since the sequence number has not been assigned yet. The
    1809           1 :                 // correct sequence number will be set later. But it is correct when the
    1810           1 :                 // batch is being replayed from the WAL.
    1811           1 :                 b.seqNum = batch.SeqNum()
    1812           1 :         }
    1813           1 :         var rangeDelOffsets []flushableBatchEntry
    1814           1 :         var rangeKeyOffsets []flushableBatchEntry
    1815           1 :         if len(b.data) > batchHeaderLen {
    1816           1 :                 // Non-empty batch.
    1817           1 :                 var index uint32
    1818           1 :                 for iter := BatchReader(b.data[batchHeaderLen:]); len(iter) > 0; index++ {
    1819           1 :                         offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
    1820           1 :                         kind, key, _, ok, err := iter.Next()
    1821           1 :                         if !ok {
    1822           0 :                                 if err != nil {
    1823           0 :                                         return nil, err
    1824           0 :                                 }
    1825           0 :                                 break
    1826             :                         }
    1827           1 :                         entry := flushableBatchEntry{
    1828           1 :                                 offset: uint32(offset),
    1829           1 :                                 index:  uint32(index),
    1830           1 :                         }
    1831           1 :                         if keySize := uint32(len(key)); keySize == 0 {
    1832           0 :                                 // Must add 2 to the offset. One byte encodes `kind` and the next
    1833           0 :                                 // byte encodes `0`, which is the length of the key.
    1834           0 :                                 entry.keyStart = uint32(offset) + 2
    1835           0 :                                 entry.keyEnd = entry.keyStart
    1836           1 :                         } else {
    1837           1 :                                 entry.keyStart = uint32(uintptr(unsafe.Pointer(&key[0])) -
    1838           1 :                                         uintptr(unsafe.Pointer(&b.data[0])))
    1839           1 :                                 entry.keyEnd = entry.keyStart + keySize
    1840           1 :                         }
    1841           1 :                         switch kind {
    1842           1 :                         case InternalKeyKindRangeDelete:
    1843           1 :                                 rangeDelOffsets = append(rangeDelOffsets, entry)
    1844           1 :                         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
    1845           1 :                                 rangeKeyOffsets = append(rangeKeyOffsets, entry)
    1846           1 :                         default:
    1847           1 :                                 b.offsets = append(b.offsets, entry)
    1848             :                         }
    1849             :                 }
    1850             :         }
    1851             : 
    1852             :         // Sort all of offsets, rangeDelOffsets and rangeKeyOffsets, using *batch's
    1853             :         // sort.Interface implementation.
    1854           1 :         pointOffsets := b.offsets
    1855           1 :         sort.Sort(b)
    1856           1 :         b.offsets = rangeDelOffsets
    1857           1 :         sort.Sort(b)
    1858           1 :         b.offsets = rangeKeyOffsets
    1859           1 :         sort.Sort(b)
    1860           1 :         b.offsets = pointOffsets
    1861           1 : 
    1862           1 :         if len(rangeDelOffsets) > 0 {
    1863           1 :                 frag := &keyspan.Fragmenter{
    1864           1 :                         Cmp:    b.cmp,
    1865           1 :                         Format: b.formatKey,
    1866           1 :                         Emit: func(s keyspan.Span) {
    1867           1 :                                 b.tombstones = append(b.tombstones, s)
    1868           1 :                         },
    1869             :                 }
    1870           1 :                 it := &flushableBatchIter{
    1871           1 :                         batch:   b,
    1872           1 :                         data:    b.data,
    1873           1 :                         offsets: rangeDelOffsets,
    1874           1 :                         cmp:     b.cmp,
    1875           1 :                         index:   -1,
    1876           1 :                 }
    1877           1 :                 fragmentRangeDels(frag, it, len(rangeDelOffsets))
    1878             :         }
    1879           1 :         if len(rangeKeyOffsets) > 0 {
    1880           1 :                 frag := &keyspan.Fragmenter{
    1881           1 :                         Cmp:    b.cmp,
    1882           1 :                         Format: b.formatKey,
    1883           1 :                         Emit: func(s keyspan.Span) {
    1884           1 :                                 b.rangeKeys = append(b.rangeKeys, s)
    1885           1 :                         },
    1886             :                 }
    1887           1 :                 it := &flushableBatchIter{
    1888           1 :                         batch:   b,
    1889           1 :                         data:    b.data,
    1890           1 :                         offsets: rangeKeyOffsets,
    1891           1 :                         cmp:     b.cmp,
    1892           1 :                         index:   -1,
    1893           1 :                 }
    1894           1 :                 fragmentRangeKeys(frag, it, len(rangeKeyOffsets))
    1895             :         }
    1896           1 :         return b, nil
    1897             : }
    1898             : 
    1899           1 : func (b *flushableBatch) setSeqNum(seqNum uint64) {
    1900           1 :         if b.seqNum != 0 {
    1901           0 :                 panic(fmt.Sprintf("pebble: flushableBatch.seqNum already set: %d", b.seqNum))
    1902             :         }
    1903           1 :         b.seqNum = seqNum
    1904           1 :         for i := range b.tombstones {
    1905           1 :                 for j := range b.tombstones[i].Keys {
    1906           1 :                         b.tombstones[i].Keys[j].Trailer = base.MakeTrailer(
    1907           1 :                                 b.tombstones[i].Keys[j].SeqNum()+seqNum,
    1908           1 :                                 b.tombstones[i].Keys[j].Kind(),
    1909           1 :                         )
    1910           1 :                 }
    1911             :         }
    1912           1 :         for i := range b.rangeKeys {
    1913           1 :                 for j := range b.rangeKeys[i].Keys {
    1914           1 :                         b.rangeKeys[i].Keys[j].Trailer = base.MakeTrailer(
    1915           1 :                                 b.rangeKeys[i].Keys[j].SeqNum()+seqNum,
    1916           1 :                                 b.rangeKeys[i].Keys[j].Kind(),
    1917           1 :                         )
    1918           1 :                 }
    1919             :         }
    1920             : }
    1921             : 
    1922           1 : func (b *flushableBatch) Len() int {
    1923           1 :         return len(b.offsets)
    1924           1 : }
    1925             : 
    1926           1 : func (b *flushableBatch) Less(i, j int) bool {
    1927           1 :         ei := &b.offsets[i]
    1928           1 :         ej := &b.offsets[j]
    1929           1 :         ki := b.data[ei.keyStart:ei.keyEnd]
    1930           1 :         kj := b.data[ej.keyStart:ej.keyEnd]
    1931           1 :         switch c := b.cmp(ki, kj); {
    1932           1 :         case c < 0:
    1933           1 :                 return true
    1934           1 :         case c > 0:
    1935           1 :                 return false
    1936           1 :         default:
    1937           1 :                 return ei.offset > ej.offset
    1938             :         }
    1939             : }
    1940             : 
    1941           1 : func (b *flushableBatch) Swap(i, j int) {
    1942           1 :         b.offsets[i], b.offsets[j] = b.offsets[j], b.offsets[i]
    1943           1 : }
    1944             : 
    1945             : // newIter is part of the flushable interface.
    1946           1 : func (b *flushableBatch) newIter(o *IterOptions) internalIterator {
    1947           1 :         return &flushableBatchIter{
    1948           1 :                 batch:   b,
    1949           1 :                 data:    b.data,
    1950           1 :                 offsets: b.offsets,
    1951           1 :                 cmp:     b.cmp,
    1952           1 :                 index:   -1,
    1953           1 :                 lower:   o.GetLowerBound(),
    1954           1 :                 upper:   o.GetUpperBound(),
    1955           1 :         }
    1956           1 : }
    1957             : 
    1958             : // newFlushIter is part of the flushable interface.
    1959           1 : func (b *flushableBatch) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
    1960           1 :         return &flushFlushableBatchIter{
    1961           1 :                 flushableBatchIter: flushableBatchIter{
    1962           1 :                         batch:   b,
    1963           1 :                         data:    b.data,
    1964           1 :                         offsets: b.offsets,
    1965           1 :                         cmp:     b.cmp,
    1966           1 :                         index:   -1,
    1967           1 :                 },
    1968           1 :                 bytesIterated: bytesFlushed,
    1969           1 :         }
    1970           1 : }
    1971             : 
    1972             : // newRangeDelIter is part of the flushable interface.
    1973           1 : func (b *flushableBatch) newRangeDelIter(o *IterOptions) keyspan.FragmentIterator {
    1974           1 :         if len(b.tombstones) == 0 {
    1975           1 :                 return nil
    1976           1 :         }
    1977           1 :         return keyspan.NewIter(b.cmp, b.tombstones)
    1978             : }
    1979             : 
    1980             : // newRangeKeyIter is part of the flushable interface.
    1981           1 : func (b *flushableBatch) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
    1982           1 :         if len(b.rangeKeys) == 0 {
    1983           1 :                 return nil
    1984           1 :         }
    1985           1 :         return keyspan.NewIter(b.cmp, b.rangeKeys)
    1986             : }
    1987             : 
    1988             : // containsRangeKeys is part of the flushable interface.
    1989           1 : func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0 }
    1990             : 
    1991             : // inuseBytes is part of the flushable interface.
    1992           1 : func (b *flushableBatch) inuseBytes() uint64 {
    1993           1 :         return uint64(len(b.data) - batchHeaderLen)
    1994           1 : }
    1995             : 
    1996             : // totalBytes is part of the flushable interface.
    1997           1 : func (b *flushableBatch) totalBytes() uint64 {
    1998           1 :         return uint64(cap(b.data))
    1999           1 : }
    2000             : 
    2001             : // readyForFlush is part of the flushable interface.
    2002           1 : func (b *flushableBatch) readyForFlush() bool {
    2003           1 :         // A flushable batch is always ready for flush; it must be flushed together
    2004           1 :         // with the previous memtable.
    2005           1 :         return true
    2006           1 : }
    2007             : 
    2008             : // computePossibleOverlaps is part of the flushable interface.
    2009             : func (b *flushableBatch) computePossibleOverlaps(
    2010             :         fn func(bounded) shouldContinue, bounded ...bounded,
    2011           1 : ) {
    2012           1 :         computePossibleOverlapsGenericImpl[*flushableBatch](b, b.cmp, fn, bounded)
    2013           1 : }
    2014             : 
    2015             : // Note: flushableBatchIter mirrors the implementation of batchIter. Keep the
    2016             : // two in sync.
    2017             : type flushableBatchIter struct {
    2018             :         // Members to be initialized by creator.
    2019             :         batch *flushableBatch
    2020             :         // The bytes backing the batch. Always the same as batch.data?
    2021             :         data []byte
    2022             :         // The sorted entries. This is not always equal to batch.offsets.
    2023             :         offsets []flushableBatchEntry
    2024             :         cmp     Compare
    2025             :         // Must be initialized to -1. It is the index into offsets that represents
    2026             :         // the current iterator position.
    2027             :         index int
    2028             : 
    2029             :         // For internal use by the implementation.
    2030             :         key InternalKey
    2031             :         err error
    2032             : 
    2033             :         // Optionally initialize to bounds of iteration, if any.
    2034             :         lower []byte
    2035             :         upper []byte
    2036             : }
    2037             : 
    2038             : // flushableBatchIter implements the base.InternalIterator interface.
    2039             : var _ base.InternalIterator = (*flushableBatchIter)(nil)
    2040             : 
    2041           1 : func (i *flushableBatchIter) String() string {
    2042           1 :         return "flushable-batch"
    2043           1 : }
    2044             : 
    2045             : // SeekGE implements internalIterator.SeekGE, as documented in the pebble
    2046             : // package. Ignore flags.TrySeekUsingNext() since we don't expect this
    2047             : // optimization to provide much benefit here at the moment.
    2048             : func (i *flushableBatchIter) SeekGE(
    2049             :         key []byte, flags base.SeekGEFlags,
    2050           1 : ) (*InternalKey, base.LazyValue) {
    2051           1 :         i.err = nil // clear cached iteration error
    2052           1 :         ikey := base.MakeSearchKey(key)
    2053           1 :         i.index = sort.Search(len(i.offsets), func(j int) bool {
    2054           1 :                 return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
    2055           1 :         })
    2056           1 :         if i.index >= len(i.offsets) {
    2057           1 :                 return nil, base.LazyValue{}
    2058           1 :         }
    2059           1 :         i.key = i.getKey(i.index)
    2060           1 :         if i.upper != nil && i.cmp(i.key.UserKey, i.upper) >= 0 {
    2061           1 :                 i.index = len(i.offsets)
    2062           1 :                 return nil, base.LazyValue{}
    2063           1 :         }
    2064           1 :         return &i.key, i.value()
    2065             : }
    2066             : 
    2067             : // SeekPrefixGE implements internalIterator.SeekPrefixGE, as documented in the
    2068             : // pebble package.
    2069             : func (i *flushableBatchIter) SeekPrefixGE(
    2070             :         prefix, key []byte, flags base.SeekGEFlags,
    2071           1 : ) (*base.InternalKey, base.LazyValue) {
    2072           1 :         return i.SeekGE(key, flags)
    2073           1 : }
    2074             : 
    2075             : // SeekLT implements internalIterator.SeekLT, as documented in the pebble
    2076             : // package.
    2077             : func (i *flushableBatchIter) SeekLT(
    2078             :         key []byte, flags base.SeekLTFlags,
    2079           1 : ) (*InternalKey, base.LazyValue) {
    2080           1 :         i.err = nil // clear cached iteration error
    2081           1 :         ikey := base.MakeSearchKey(key)
    2082           1 :         i.index = sort.Search(len(i.offsets), func(j int) bool {
    2083           1 :                 return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
    2084           1 :         })
    2085           1 :         i.index--
    2086           1 :         if i.index < 0 {
    2087           1 :                 return nil, base.LazyValue{}
    2088           1 :         }
    2089           1 :         i.key = i.getKey(i.index)
    2090           1 :         if i.lower != nil && i.cmp(i.key.UserKey, i.lower) < 0 {
    2091           1 :                 i.index = -1
    2092           1 :                 return nil, base.LazyValue{}
    2093           1 :         }
    2094           1 :         return &i.key, i.value()
    2095             : }
    2096             : 
    2097             : // First implements internalIterator.First, as documented in the pebble
    2098             : // package.
    2099           1 : func (i *flushableBatchIter) First() (*InternalKey, base.LazyValue) {
    2100           1 :         i.err = nil // clear cached iteration error
    2101           1 :         if len(i.offsets) == 0 {
    2102           1 :                 return nil, base.LazyValue{}
    2103           1 :         }
    2104           1 :         i.index = 0
    2105           1 :         i.key = i.getKey(i.index)
    2106           1 :         if i.upper != nil && i.cmp(i.key.UserKey, i.upper) >= 0 {
    2107           1 :                 i.index = len(i.offsets)
    2108           1 :                 return nil, base.LazyValue{}
    2109           1 :         }
    2110           1 :         return &i.key, i.value()
    2111             : }
    2112             : 
    2113             : // Last implements internalIterator.Last, as documented in the pebble
    2114             : // package.
    2115           1 : func (i *flushableBatchIter) Last() (*InternalKey, base.LazyValue) {
    2116           1 :         i.err = nil // clear cached iteration error
    2117           1 :         if len(i.offsets) == 0 {
    2118           0 :                 return nil, base.LazyValue{}
    2119           0 :         }
    2120           1 :         i.index = len(i.offsets) - 1
    2121           1 :         i.key = i.getKey(i.index)
    2122           1 :         if i.lower != nil && i.cmp(i.key.UserKey, i.lower) < 0 {
    2123           0 :                 i.index = -1
    2124           0 :                 return nil, base.LazyValue{}
    2125           0 :         }
    2126           1 :         return &i.key, i.value()
    2127             : }
    2128             : 
    2129             : // Note: flushFlushableBatchIter.Next mirrors the implementation of
    2130             : // flushableBatchIter.Next due to performance. Keep the two in sync.
    2131           1 : func (i *flushableBatchIter) Next() (*InternalKey, base.LazyValue) {
    2132           1 :         if i.index == len(i.offsets) {
    2133           0 :                 return nil, base.LazyValue{}
    2134           0 :         }
    2135           1 :         i.index++
    2136           1 :         if i.index == len(i.offsets) {
    2137           1 :                 return nil, base.LazyValue{}
    2138           1 :         }
    2139           1 :         i.key = i.getKey(i.index)
    2140           1 :         if i.upper != nil && i.cmp(i.key.UserKey, i.upper) >= 0 {
    2141           1 :                 i.index = len(i.offsets)
    2142           1 :                 return nil, base.LazyValue{}
    2143           1 :         }
    2144           1 :         return &i.key, i.value()
    2145             : }
    2146             : 
    2147           1 : func (i *flushableBatchIter) Prev() (*InternalKey, base.LazyValue) {
    2148           1 :         if i.index < 0 {
    2149           0 :                 return nil, base.LazyValue{}
    2150           0 :         }
    2151           1 :         i.index--
    2152           1 :         if i.index < 0 {
    2153           1 :                 return nil, base.LazyValue{}
    2154           1 :         }
    2155           1 :         i.key = i.getKey(i.index)
    2156           1 :         if i.lower != nil && i.cmp(i.key.UserKey, i.lower) < 0 {
    2157           1 :                 i.index = -1
    2158           1 :                 return nil, base.LazyValue{}
    2159           1 :         }
    2160           1 :         return &i.key, i.value()
    2161             : }
    2162             : 
    2163             : // Note: flushFlushableBatchIter.NextPrefix mirrors the implementation of
    2164             : // flushableBatchIter.NextPrefix due to performance. Keep the two in sync.
    2165           0 : func (i *flushableBatchIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
    2166           0 :         return i.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
    2167           0 : }
    2168             : 
    2169           1 : func (i *flushableBatchIter) getKey(index int) InternalKey {
    2170           1 :         e := &i.offsets[index]
    2171           1 :         kind := InternalKeyKind(i.data[e.offset])
    2172           1 :         key := i.data[e.keyStart:e.keyEnd]
    2173           1 :         return base.MakeInternalKey(key, i.batch.seqNum+uint64(e.index), kind)
    2174           1 : }
    2175             : 
    2176           1 : func (i *flushableBatchIter) value() base.LazyValue {
    2177           1 :         p := i.data[i.offsets[i.index].offset:]
    2178           1 :         if len(p) == 0 {
    2179           0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    2180           0 :                 return base.LazyValue{}
    2181           0 :         }
    2182           1 :         kind := InternalKeyKind(p[0])
    2183           1 :         if kind > InternalKeyKindMax {
    2184           0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    2185           0 :                 return base.LazyValue{}
    2186           0 :         }
    2187           1 :         var value []byte
    2188           1 :         var ok bool
    2189           1 :         switch kind {
    2190             :         case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
    2191             :                 InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
    2192           1 :                 InternalKeyKindDeleteSized:
    2193           1 :                 keyEnd := i.offsets[i.index].keyEnd
    2194           1 :                 _, value, ok = batchDecodeStr(i.data[keyEnd:])
    2195           1 :                 if !ok {
    2196           0 :                         i.err = base.CorruptionErrorf("corrupted batch")
    2197           0 :                         return base.LazyValue{}
    2198           0 :                 }
    2199             :         }
    2200           1 :         return base.MakeInPlaceValue(value)
    2201             : }
    2202             : 
    2203           0 : func (i *flushableBatchIter) Valid() bool {
    2204           0 :         return i.index >= 0 && i.index < len(i.offsets)
    2205           0 : }
    2206             : 
    2207           1 : func (i *flushableBatchIter) Error() error {
    2208           1 :         return i.err
    2209           1 : }
    2210             : 
    2211           1 : func (i *flushableBatchIter) Close() error {
    2212           1 :         return i.err
    2213           1 : }
    2214             : 
    2215           1 : func (i *flushableBatchIter) SetBounds(lower, upper []byte) {
    2216           1 :         i.lower = lower
    2217           1 :         i.upper = upper
    2218           1 : }
    2219             : 
    2220           0 : func (i *flushableBatchIter) SetContext(_ context.Context) {}
    2221             : 
    2222             : // flushFlushableBatchIter is similar to flushableBatchIter but it keeps track
    2223             : // of number of bytes iterated.
    2224             : type flushFlushableBatchIter struct {
    2225             :         flushableBatchIter
    2226             :         bytesIterated *uint64
    2227             : }
    2228             : 
    2229             : // flushFlushableBatchIter implements the base.InternalIterator interface.
    2230             : var _ base.InternalIterator = (*flushFlushableBatchIter)(nil)
    2231             : 
    2232           0 : func (i *flushFlushableBatchIter) String() string {
    2233           0 :         return "flushable-batch"
    2234           0 : }
    2235             : 
    2236             : func (i *flushFlushableBatchIter) SeekGE(
    2237             :         key []byte, flags base.SeekGEFlags,
    2238           0 : ) (*InternalKey, base.LazyValue) {
    2239           0 :         panic("pebble: SeekGE unimplemented")
    2240             : }
    2241             : 
    2242             : func (i *flushFlushableBatchIter) SeekPrefixGE(
    2243             :         prefix, key []byte, flags base.SeekGEFlags,
    2244           0 : ) (*base.InternalKey, base.LazyValue) {
    2245           0 :         panic("pebble: SeekPrefixGE unimplemented")
    2246             : }
    2247             : 
    2248             : func (i *flushFlushableBatchIter) SeekLT(
    2249             :         key []byte, flags base.SeekLTFlags,
    2250           0 : ) (*InternalKey, base.LazyValue) {
    2251           0 :         panic("pebble: SeekLT unimplemented")
    2252             : }
    2253             : 
    2254           1 : func (i *flushFlushableBatchIter) First() (*InternalKey, base.LazyValue) {
    2255           1 :         i.err = nil // clear cached iteration error
    2256           1 :         key, val := i.flushableBatchIter.First()
    2257           1 :         if key == nil {
    2258           0 :                 return nil, base.LazyValue{}
    2259           0 :         }
    2260           1 :         entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset
    2261           1 :         *i.bytesIterated += uint64(entryBytes) + i.valueSize()
    2262           1 :         return key, val
    2263             : }
    2264             : 
    2265           0 : func (i *flushFlushableBatchIter) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
    2266           0 :         panic("pebble: Prev unimplemented")
    2267             : }
    2268             : 
    2269             : // Note: flushFlushableBatchIter.Next mirrors the implementation of
    2270             : // flushableBatchIter.Next due to performance. Keep the two in sync.
    2271           1 : func (i *flushFlushableBatchIter) Next() (*InternalKey, base.LazyValue) {
    2272           1 :         if i.index == len(i.offsets) {
    2273           0 :                 return nil, base.LazyValue{}
    2274           0 :         }
    2275           1 :         i.index++
    2276           1 :         if i.index == len(i.offsets) {
    2277           1 :                 return nil, base.LazyValue{}
    2278           1 :         }
    2279           1 :         i.key = i.getKey(i.index)
    2280           1 :         entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset
    2281           1 :         *i.bytesIterated += uint64(entryBytes) + i.valueSize()
    2282           1 :         return &i.key, i.value()
    2283             : }
    2284             : 
    2285           0 : func (i flushFlushableBatchIter) Prev() (*InternalKey, base.LazyValue) {
    2286           0 :         panic("pebble: Prev unimplemented")
    2287             : }
    2288             : 
    2289           1 : func (i flushFlushableBatchIter) valueSize() uint64 {
    2290           1 :         p := i.data[i.offsets[i.index].offset:]
    2291           1 :         if len(p) == 0 {
    2292           0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    2293           0 :                 return 0
    2294           0 :         }
    2295           1 :         kind := InternalKeyKind(p[0])
    2296           1 :         if kind > InternalKeyKindMax {
    2297           0 :                 i.err = base.CorruptionErrorf("corrupted batch")
    2298           0 :                 return 0
    2299           0 :         }
    2300           1 :         var length uint64
    2301           1 :         switch kind {
    2302           1 :         case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete:
    2303           1 :                 keyEnd := i.offsets[i.index].keyEnd
    2304           1 :                 v, n := binary.Uvarint(i.data[keyEnd:])
    2305           1 :                 if n <= 0 {
    2306           0 :                         i.err = base.CorruptionErrorf("corrupted batch")
    2307           0 :                         return 0
    2308           0 :                 }
    2309           1 :                 length = v + uint64(n)
    2310             :         }
    2311           1 :         return length
    2312             : }
    2313             : 
    2314             : // batchSort returns iterators for the sorted contents of the batch. It is
    2315             : // intended for testing use only. The batch.Sort dance is done to prevent
    2316             : // exposing this method in the public pebble interface.
    2317             : func batchSort(
    2318             :         i interface{},
    2319             : ) (
    2320             :         points internalIterator,
    2321             :         rangeDels keyspan.FragmentIterator,
    2322             :         rangeKeys keyspan.FragmentIterator,
    2323           1 : ) {
    2324           1 :         b := i.(*Batch)
    2325           1 :         if b.Indexed() {
    2326           1 :                 pointIter := b.newInternalIter(nil)
    2327           1 :                 rangeDelIter := b.newRangeDelIter(nil, math.MaxUint64)
    2328           1 :                 rangeKeyIter := b.newRangeKeyIter(nil, math.MaxUint64)
    2329           1 :                 return pointIter, rangeDelIter, rangeKeyIter
    2330           1 :         }
    2331           1 :         f, err := newFlushableBatch(b, b.db.opts.Comparer)
    2332           1 :         if err != nil {
    2333           0 :                 panic(err)
    2334             :         }
    2335           1 :         return f.newIter(nil), f.newRangeDelIter(nil), f.newRangeKeyIter(nil)
    2336             : }
    2337             : 
    2338           1 : func init() {
    2339           1 :         private.BatchSort = batchSort
    2340           1 : }

Generated by: LCOV version 1.14