Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "sort"
15 : "sync"
16 : "sync/atomic"
17 : "time"
18 : "unsafe"
19 :
20 : "github.com/cockroachdb/crlib/crtime"
21 : "github.com/cockroachdb/errors"
22 : "github.com/cockroachdb/pebble/batchrepr"
23 : "github.com/cockroachdb/pebble/internal/base"
24 : "github.com/cockroachdb/pebble/internal/batchskl"
25 : "github.com/cockroachdb/pebble/internal/humanize"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/keyspan"
28 : "github.com/cockroachdb/pebble/internal/private"
29 : "github.com/cockroachdb/pebble/internal/rangedel"
30 : "github.com/cockroachdb/pebble/internal/rangekey"
31 : "github.com/cockroachdb/pebble/internal/rawalloc"
32 : "github.com/cockroachdb/pebble/internal/treeprinter"
33 : )
34 :
35 : const (
36 : invalidBatchCount = 1<<32 - 1
37 : maxVarintLen32 = 5
38 :
39 : defaultBatchInitialSize = 1 << 10 // 1 KB
40 : defaultBatchMaxRetainedSize = 1 << 20 // 1 MB
41 : )
42 :
43 : // ErrNotIndexed means that a read operation on a batch failed because the
44 : // batch is not indexed and thus doesn't support reads.
45 : var ErrNotIndexed = errors.New("pebble: batch not indexed")
46 :
47 : // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
48 : var ErrInvalidBatch = batchrepr.ErrInvalidBatch
49 :
50 : // ErrBatchTooLarge indicates that a batch is invalid or otherwise corrupted.
51 : var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize)))
52 :
53 : // DeferredBatchOp represents a batch operation (eg. set, merge, delete) that is
54 : // being inserted into the batch. Indexing is not performed on the specified key
55 : // until Finish is called, hence the name deferred. This struct lets the caller
56 : // copy or encode keys/values directly into the batch representation instead of
57 : // copying into an intermediary buffer then having pebble.Batch copy off of it.
58 : type DeferredBatchOp struct {
59 : index *batchskl.Skiplist
60 :
61 : // Key and Value point to parts of the binary batch representation where
62 : // keys and values should be encoded/copied into. len(Key) and len(Value)
63 : // bytes must be copied into these slices respectively before calling
64 : // Finish(). Changing where these slices point to is not allowed.
65 : Key, Value []byte
66 : offset uint32
67 : }
68 :
69 : // Finish completes the addition of this batch operation, and adds it to the
70 : // index if necessary. Must be called once (and exactly once) keys/values
71 : // have been filled into Key and Value. Not calling Finish or not
72 : // copying/encoding keys will result in an incomplete index, and calling Finish
73 : // twice may result in a panic.
74 1 : func (d DeferredBatchOp) Finish() error {
75 1 : if d.index != nil {
76 0 : if err := d.index.Add(d.offset); err != nil {
77 0 : return err
78 0 : }
79 : }
80 1 : return nil
81 : }
82 :
83 : // A Batch is a sequence of Sets, Merges, Deletes, DeleteRanges, RangeKeySets,
84 : // RangeKeyUnsets, and/or RangeKeyDeletes that are applied atomically. Batch
85 : // implements the Reader interface, but only an indexed batch supports reading
86 : // (without error) via Get or NewIter. A non-indexed batch will return
87 : // ErrNotIndexed when read from. A batch is not safe for concurrent use, and
88 : // consumers should use a batch per goroutine or provide their own
89 : // synchronization.
90 : //
91 : // # Indexing
92 : //
93 : // Batches can be optionally indexed (see DB.NewIndexedBatch). An indexed batch
94 : // allows iteration via an Iterator (see Batch.NewIter). The iterator provides
95 : // a merged view of the operations in the batch and the underlying
96 : // database. This is implemented by treating the batch as an additional layer
97 : // in the LSM where every entry in the batch is considered newer than any entry
98 : // in the underlying database (batch entries have the InternalKeySeqNumBatch
99 : // bit set). By treating the batch as an additional layer in the LSM, iteration
100 : // supports all batch operations (i.e. Set, Merge, Delete, DeleteRange,
101 : // RangeKeySet, RangeKeyUnset, RangeKeyDelete) with minimal effort.
102 : //
103 : // The same key can be operated on multiple times in a batch, though only the
104 : // latest operation will be visible. For example, Put("a", "b"), Delete("a")
105 : // will cause the key "a" to not be visible in the batch. Put("a", "b"),
106 : // Put("a", "c") will cause a read of "a" to return the value "c".
107 : //
108 : // The batch index is implemented via an skiplist (internal/batchskl). While
109 : // the skiplist implementation is very fast, inserting into an indexed batch is
110 : // significantly slower than inserting into a non-indexed batch. Only use an
111 : // indexed batch if you require reading from it.
112 : //
113 : // # Atomic commit
114 : //
115 : // The operations in a batch are persisted by calling Batch.Commit which is
116 : // equivalent to calling DB.Apply(batch). A batch is committed atomically by
117 : // writing the internal batch representation to the WAL, adding all of the
118 : // batch operations to the memtable associated with the WAL, and then
119 : // incrementing the visible sequence number so that subsequent reads can see
120 : // the effects of the batch operations. If WriteOptions.Sync is true, a call to
121 : // Batch.Commit will guarantee that the batch is persisted to disk before
122 : // returning. See commitPipeline for more on the implementation details.
123 : //
124 : // # Large batches
125 : //
126 : // The size of a batch is limited only by available memory (be aware that
127 : // indexed batches require considerably additional memory for the skiplist
128 : // structure). A given WAL file has a single memtable associated with it (this
129 : // restriction could be removed, but doing so is onerous and complex). And a
130 : // memtable has a fixed size due to the underlying fixed size arena. Note that
131 : // this differs from RocksDB where a memtable can grow arbitrarily large using
132 : // a list of arena chunks. In RocksDB this is accomplished by storing pointers
133 : // in the arena memory, but that isn't possible in Go.
134 : //
135 : // During Batch.Commit, a batch which is larger than a threshold (>
136 : // MemTableSize/2) is wrapped in a flushableBatch and inserted into the queue
137 : // of memtables. A flushableBatch forces WAL to be rotated, but that happens
138 : // anyways when the memtable becomes full so this does not cause significant
139 : // WAL churn. Because the flushableBatch is readable as another layer in the
140 : // LSM, Batch.Commit returns as soon as the flushableBatch has been added to
141 : // the queue of memtables.
142 : //
143 : // Internally, a flushableBatch provides Iterator support by sorting the batch
144 : // contents (the batch is sorted once, when it is added to the memtable
145 : // queue). Sorting the batch contents and insertion of the contents into a
146 : // memtable have the same big-O time, but the constant factor dominates
147 : // here. Sorting is significantly faster and uses significantly less memory.
148 : //
149 : // # Internal representation
150 : //
151 : // The internal batch representation is a contiguous byte buffer with a fixed
152 : // 12-byte header, followed by a series of records.
153 : //
154 : // +-------------+------------+--- ... ---+
155 : // | SeqNum (8B) | Count (4B) | Entries |
156 : // +-------------+------------+--- ... ---+
157 : //
158 : // Each record has a 1-byte kind tag prefix, followed by 1 or 2 length prefixed
159 : // strings (varstring):
160 : //
161 : // +-----------+-----------------+-------------------+
162 : // | Kind (1B) | Key (varstring) | Value (varstring) |
163 : // +-----------+-----------------+-------------------+
164 : //
165 : // A varstring is a varint32 followed by N bytes of data. The Kind tags are
166 : // exactly those specified by InternalKeyKind. The following table shows the
167 : // format for records of each kind:
168 : //
169 : // InternalKeyKindDelete varstring
170 : // InternalKeyKindLogData varstring
171 : // InternalKeyKindIngestSST varstring
172 : // InternalKeyKindSet varstring varstring
173 : // InternalKeyKindMerge varstring varstring
174 : // InternalKeyKindRangeDelete varstring varstring
175 : // InternalKeyKindRangeKeySet varstring varstring
176 : // InternalKeyKindRangeKeyUnset varstring varstring
177 : // InternalKeyKindRangeKeyDelete varstring varstring
178 : //
179 : // The intuitive understanding here are that the arguments to Delete, Set,
180 : // Merge, DeleteRange and RangeKeyDelete are encoded into the batch. The
181 : // RangeKeySet and RangeKeyUnset operations are slightly more complicated,
182 : // encoding their end key, suffix and value [in the case of RangeKeySet] within
183 : // the Value varstring. For more information on the value encoding for
184 : // RangeKeySet and RangeKeyUnset, see the internal/rangekey package.
185 : //
186 : // The internal batch representation is the on disk format for a batch in the
187 : // WAL, and thus stable. New record kinds may be added, but the existing ones
188 : // will not be modified.
189 : type Batch struct {
190 : batchInternal
191 : applied atomic.Bool
192 : // lifecycle is used to negotiate the lifecycle of a Batch. A Batch and its
193 : // underlying batchInternal.data byte slice may be reused. There are two
194 : // mechanisms for reuse:
195 : //
196 : // 1. The caller may explicitly call [Batch.Reset] to reset the batch to be
197 : // empty (while retaining the underlying repr's buffer).
198 : // 2. The caller may call [Batch.Close], passing ownership off to Pebble,
199 : // which may reuse the batch's memory to service new callers to
200 : // [DB.NewBatch].
201 : //
202 : // There's a complication to reuse: When WAL failover is configured, the
203 : // Pebble commit pipeline may retain a pointer to the batch.data beyond the
204 : // return of [Batch.Commit]. The user of the Batch may commit their batch
205 : // and call Close or Reset before the commit pipeline is finished reading
206 : // the data slice. Recycling immediately would cause a data race.
207 : //
208 : // To resolve this data race, this [lifecycle] atomic is used to determine
209 : // safety and responsibility of reusing a batch. The low bits of the atomic
210 : // are used as a reference count (really just the lowest bit—in practice
211 : // there's only 1 code path that references). The [Batch] is passed into
212 : // [wal.Writer]'s WriteRecord method as a [RefCount] implementation. The
213 : // wal.Writer guarantees that if it will read [Batch.data] after the call to
214 : // WriteRecord returns, it will increment the reference count. When it's
215 : // complete, it'll unreference through invoking [Batch.Unref].
216 : //
217 : // When the committer of a batch indicates intent to recycle a Batch through
218 : // calling [Batch.Reset] or [Batch.Close], the lifecycle atomic is read. If
219 : // an outstanding reference remains, it's unsafe to reuse Batch.data yet. In
220 : // [Batch.Reset] the caller wants to reuse the [Batch] immediately, so we
221 : // discard b.data to recycle the struct but not the underlying byte slice.
222 : // In [Batch.Close], we set a special high bit [batchClosedBit] on lifecycle
223 : // that indicates that the user will not use [Batch] again and we're free to
224 : // recycle it when safe. When the commit pipeline eventually calls
225 : // [Batch.Unref], the [batchClosedBit] is noticed and the batch is
226 : // recycled.
227 : lifecycle atomic.Int32
228 : }
229 :
230 : // batchClosedBit is a bit stored on Batch.lifecycle to indicate that the user
231 : // called [Batch.Close] to release a Batch, but an open reference count
232 : // prevented immediate recycling.
233 : const batchClosedBit = 1 << 30
234 :
235 : // TODO(jackson): Hide the wal.RefCount implementation from the public Batch interface.
236 :
237 : // Ref implements wal.RefCount. If the WAL writer may need to read b.data after
238 : // it returns, it invokes Ref to increment the lifecycle's reference count. When
239 : // it's finished, it invokes Unref.
240 2 : func (b *Batch) Ref() {
241 2 : b.lifecycle.Add(+1)
242 2 : }
243 :
244 : // Unref implemets wal.RefCount.
245 2 : func (b *Batch) Unref() {
246 2 : if v := b.lifecycle.Add(-1); (v ^ batchClosedBit) == 0 {
247 2 : // The [batchClosedBit] high bit is set, and there are no outstanding
248 2 : // references. The user of the Batch called [Batch.Close], expecting the
249 2 : // batch to be recycled. However, our outstanding reference count
250 2 : // prevented recycling. As the last to dereference, we're now
251 2 : // responsible for releasing the batch.
252 2 : b.lifecycle.Store(0)
253 2 : b.release()
254 2 : }
255 : }
256 :
257 : // batchInternal contains the set of fields within Batch that are non-atomic and
258 : // capable of being reset using a *b = batchInternal{} struct copy.
259 : type batchInternal struct {
260 : // Data is the wire format of a batch's log entry:
261 : // - 8 bytes for a sequence number of the first batch element,
262 : // or zeroes if the batch has not yet been applied,
263 : // - 4 bytes for the count: the number of elements in the batch,
264 : // or "\xff\xff\xff\xff" if the batch is invalid,
265 : // - count elements, being:
266 : // - one byte for the kind
267 : // - the varint-string user key,
268 : // - the varint-string value (if kind != delete).
269 : // The sequence number and count are stored in little-endian order.
270 : //
271 : // The data field can be (but is not guaranteed to be) nil for new
272 : // batches. Large batches will set the data field to nil when committed as
273 : // the data has been moved to a flushableBatch and inserted into the queue of
274 : // memtables.
275 : data []byte
276 : comparer *base.Comparer
277 : opts batchOptions
278 :
279 : // An upper bound on required space to add this batch to a memtable.
280 : // Note that although batches are limited to 4 GiB in size, that limit
281 : // applies to len(data), not the memtable size. The upper bound on the
282 : // size of a memtable node is larger than the overhead of the batch's log
283 : // encoding, so memTableSize is larger than len(data) and may overflow a
284 : // uint32.
285 : memTableSize uint64
286 :
287 : // The db to which the batch will be committed. Do not change this field
288 : // after the batch has been created as it might invalidate internal state.
289 : // Batch.memTableSize is only refreshed if Batch.db is set. Setting db to
290 : // nil once it has been set implies that the Batch has encountered an error.
291 : db *DB
292 :
293 : // The count of records in the batch. This count will be stored in the batch
294 : // data whenever Repr() is called.
295 : count uint64
296 :
297 : // The count of range deletions in the batch. Updated every time a range
298 : // deletion is added.
299 : countRangeDels uint64
300 :
301 : // The count of range key sets, unsets and deletes in the batch. Updated
302 : // every time a RANGEKEYSET, RANGEKEYUNSET or RANGEKEYDEL key is added.
303 : countRangeKeys uint64
304 :
305 : // A deferredOp struct, stored in the Batch so that a pointer can be returned
306 : // from the *Deferred() methods rather than a value.
307 : deferredOp DeferredBatchOp
308 :
309 : // An optional skiplist keyed by offset into data of the entry.
310 : index *batchskl.Skiplist
311 : rangeDelIndex *batchskl.Skiplist
312 : rangeKeyIndex *batchskl.Skiplist
313 :
314 : // Fragmented range deletion tombstones. Cached the first time a range
315 : // deletion iterator is requested. The cache is invalidated whenever a new
316 : // range deletion is added to the batch. This cache can only be used when
317 : // opening an iterator to read at a batch sequence number >=
318 : // tombstonesSeqNum. This is the case for all new iterators created over a
319 : // batch but it's not the case for all cloned iterators.
320 : tombstones []keyspan.Span
321 : tombstonesSeqNum base.SeqNum
322 :
323 : // Fragmented range key spans. Cached the first time a range key iterator is
324 : // requested. The cache is invalidated whenever a new range key
325 : // (RangeKey{Set,Unset,Del}) is added to the batch. This cache can only be
326 : // used when opening an iterator to read at a batch sequence number >=
327 : // tombstonesSeqNum. This is the case for all new iterators created over a
328 : // batch but it's not the case for all cloned iterators.
329 : rangeKeys []keyspan.Span
330 : rangeKeysSeqNum base.SeqNum
331 :
332 : // The flushableBatch wrapper if the batch is too large to fit in the
333 : // memtable.
334 : flushable *flushableBatch
335 :
336 : // minimumFormatMajorVersion indicates the format major version required in
337 : // order to commit this batch. If an operation requires a particular format
338 : // major version, it ratchets the batch's minimumFormatMajorVersion. When
339 : // the batch is committed, this is validated against the database's current
340 : // format major version.
341 : minimumFormatMajorVersion FormatMajorVersion
342 :
343 : // Synchronous Apply uses the commit WaitGroup for both publishing the
344 : // seqnum and waiting for the WAL fsync (if needed). Asynchronous
345 : // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit
346 : // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for
347 : // waiting for the WAL fsync.
348 : //
349 : // TODO(sumeer): if we find that ApplyNoSyncWait in conjunction with
350 : // SyncWait is causing higher memory usage because of the time duration
351 : // between when the sync is already done, and a goroutine calls SyncWait
352 : // (followed by Batch.Close), we could separate out {fsyncWait, commitErr}
353 : // into a separate struct that is allocated separately (using another
354 : // sync.Pool), and only that struct needs to outlive Batch.Close (which
355 : // could then be called immediately after ApplyNoSyncWait). commitStats
356 : // will also need to be in this separate struct.
357 : commit sync.WaitGroup
358 : fsyncWait sync.WaitGroup
359 :
360 : commitStats BatchCommitStats
361 :
362 : commitErr error
363 :
364 : // Position bools together to reduce the sizeof the struct.
365 :
366 : // ingestedSSTBatch indicates that the batch contains one or more key kinds
367 : // of InternalKeyKindIngestSST. If the batch contains key kinds of IngestSST
368 : // then it will only contain key kinds of IngestSST.
369 : ingestedSSTBatch bool
370 :
371 : // committing is set to true when a batch begins to commit. It's used to
372 : // ensure the batch is not mutated concurrently. It is not an atomic
373 : // deliberately, so as to avoid the overhead on batch mutations. This is
374 : // okay, because under correct usage this field will never be accessed
375 : // concurrently. It's only under incorrect usage the memory accesses of this
376 : // variable may violate memory safety. Since we don't use atomics here,
377 : // false negatives are possible.
378 : committing bool
379 : }
380 :
381 : // BatchCommitStats exposes stats related to committing a batch.
382 : //
383 : // NB: there is no Pebble internal tracing (using LoggerAndTracer) of slow
384 : // batch commits. The caller can use these stats to do their own tracing as
385 : // needed.
386 : type BatchCommitStats struct {
387 : // TotalDuration is the time spent in DB.{Apply,ApplyNoSyncWait} or
388 : // Batch.Commit, plus the time waiting in Batch.SyncWait. If there is a gap
389 : // between calling ApplyNoSyncWait and calling SyncWait, that gap could
390 : // include some duration in which real work was being done for the commit
391 : // and will not be included here. This missing time is considered acceptable
392 : // since the goal of these stats is to understand user-facing latency.
393 : //
394 : // TotalDuration includes time spent in various queues both inside Pebble
395 : // and outside Pebble (I/O queues, goroutine scheduler queue, mutex wait
396 : // etc.). For some of these queues (which we consider important) the wait
397 : // times are included below -- these expose low-level implementation detail
398 : // and are meant for expert diagnosis and subject to change. There may be
399 : // unaccounted time after subtracting those values from TotalDuration.
400 : TotalDuration time.Duration
401 : // SemaphoreWaitDuration is the wait time for semaphores in
402 : // commitPipeline.Commit.
403 : SemaphoreWaitDuration time.Duration
404 : // WALQueueWaitDuration is the wait time for allocating memory blocks in the
405 : // LogWriter (due to the LogWriter not writing fast enough). At the moment
406 : // this is duration is always zero because a single WAL will allow
407 : // allocating memory blocks up to the entire memtable size. In the future,
408 : // we may pipeline WALs and bound the WAL queued blocks separately, so this
409 : // field is preserved for that possibility.
410 : WALQueueWaitDuration time.Duration
411 : // MemTableWriteStallDuration is the wait caused by a write stall due to too
412 : // many memtables (due to not flushing fast enough).
413 : MemTableWriteStallDuration time.Duration
414 : // L0ReadAmpWriteStallDuration is the wait caused by a write stall due to
415 : // high read amplification in L0 (due to not compacting fast enough out of
416 : // L0).
417 : L0ReadAmpWriteStallDuration time.Duration
418 : // WALRotationDuration is the wait time for WAL rotation, which includes
419 : // syncing and closing the old WAL and creating (or reusing) a new one.
420 : WALRotationDuration time.Duration
421 : // CommitWaitDuration is the wait for publishing the seqnum plus the
422 : // duration for the WAL sync (if requested). The former should be tiny and
423 : // one can assume that this is all due to the WAL sync.
424 : CommitWaitDuration time.Duration
425 : }
426 :
427 : var _ Reader = (*Batch)(nil)
428 : var _ Writer = (*Batch)(nil)
429 :
430 : var batchPool = sync.Pool{
431 2 : New: func() interface{} {
432 2 : return &Batch{}
433 2 : },
434 : }
435 :
436 : type indexedBatch struct {
437 : batch Batch
438 : index batchskl.Skiplist
439 : }
440 :
441 : var indexedBatchPool = sync.Pool{
442 2 : New: func() interface{} {
443 2 : return &indexedBatch{}
444 2 : },
445 : }
446 :
447 2 : func newBatch(db *DB, opts ...BatchOption) *Batch {
448 2 : b := batchPool.Get().(*Batch)
449 2 : b.db = db
450 2 : b.opts.ensureDefaults()
451 2 : for _, opt := range opts {
452 1 : opt(&b.opts)
453 1 : }
454 2 : return b
455 : }
456 :
457 1 : func newBatchWithSize(db *DB, size int, opts ...BatchOption) *Batch {
458 1 : b := newBatch(db, opts...)
459 1 : if cap(b.data) < size {
460 1 : b.data = rawalloc.New(0, size)
461 1 : }
462 1 : return b
463 : }
464 :
465 2 : func newIndexedBatch(db *DB, comparer *Comparer) *Batch {
466 2 : i := indexedBatchPool.Get().(*indexedBatch)
467 2 : i.batch.comparer = comparer
468 2 : i.batch.db = db
469 2 : i.batch.index = &i.index
470 2 : i.batch.index.Init(&i.batch.data, comparer.Compare, comparer.AbbreviatedKey)
471 2 : i.batch.opts.ensureDefaults()
472 2 : return &i.batch
473 2 : }
474 :
475 0 : func newIndexedBatchWithSize(db *DB, comparer *Comparer, size int) *Batch {
476 0 : b := newIndexedBatch(db, comparer)
477 0 : if cap(b.data) < size {
478 0 : b.data = rawalloc.New(0, size)
479 0 : }
480 0 : return b
481 : }
482 :
483 : // nextSeqNum returns the batch "sequence number" that will be given to the next
484 : // key written to the batch. During iteration keys within an indexed batch are
485 : // given a sequence number consisting of their offset within the batch combined
486 : // with the base.SeqNumBatchBit bit. These sequence numbers are only
487 : // used during iteration, and the keys are assigned ordinary sequence numbers
488 : // when the batch is committed.
489 2 : func (b *Batch) nextSeqNum() base.SeqNum {
490 2 : return base.SeqNum(len(b.data)) | base.SeqNumBatchBit
491 2 : }
492 :
493 2 : func (b *Batch) release() {
494 2 : if b.db == nil {
495 2 : // The batch was not created using newBatch or newIndexedBatch, or an error
496 2 : // was encountered. We don't try to reuse batches that encountered an error
497 2 : // because they might be stuck somewhere in the system and attempting to
498 2 : // reuse such batches is a recipe for onerous debugging sessions. Instead,
499 2 : // let the GC do its job.
500 2 : return
501 2 : }
502 2 : b.db = nil
503 2 :
504 2 : // NB: This is ugly (it would be cleaner if we could just assign a Batch{}),
505 2 : // but necessary so that we can use atomic.StoreUint32 for the Batch.applied
506 2 : // field. Without using an atomic to clear that field the Go race detector
507 2 : // complains.
508 2 : b.reset()
509 2 : b.comparer = nil
510 2 :
511 2 : if b.index == nil {
512 2 : batchPool.Put(b)
513 2 : } else {
514 2 : b.index, b.rangeDelIndex, b.rangeKeyIndex = nil, nil, nil
515 2 : indexedBatchPool.Put((*indexedBatch)(unsafe.Pointer(b)))
516 2 : }
517 : }
518 :
519 2 : func (b *Batch) refreshMemTableSize() error {
520 2 : b.memTableSize = 0
521 2 : if len(b.data) < batchrepr.HeaderLen {
522 1 : return nil
523 1 : }
524 :
525 2 : b.countRangeDels = 0
526 2 : b.countRangeKeys = 0
527 2 : b.minimumFormatMajorVersion = 0
528 2 : for r := b.Reader(); ; {
529 2 : kind, key, value, ok, err := r.Next()
530 2 : if !ok {
531 2 : if err != nil {
532 0 : return err
533 0 : }
534 2 : break
535 : }
536 2 : switch kind {
537 2 : case InternalKeyKindRangeDelete:
538 2 : b.countRangeDels++
539 2 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
540 2 : b.countRangeKeys++
541 2 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge, InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete:
542 : // fallthrough
543 2 : case InternalKeyKindDeleteSized:
544 2 : if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
545 2 : b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
546 2 : }
547 1 : case InternalKeyKindLogData:
548 1 : // LogData does not contribute to memtable size.
549 1 : continue
550 2 : case InternalKeyKindIngestSST:
551 2 : if b.minimumFormatMajorVersion < FormatFlushableIngest {
552 2 : b.minimumFormatMajorVersion = FormatFlushableIngest
553 2 : }
554 : // This key kind doesn't contribute to the memtable size.
555 2 : continue
556 2 : case InternalKeyKindExcise:
557 2 : if b.minimumFormatMajorVersion < FormatFlushableIngestExcises {
558 2 : b.minimumFormatMajorVersion = FormatFlushableIngestExcises
559 2 : }
560 : // This key kind doesn't contribute to the memtable size.
561 2 : continue
562 0 : default:
563 0 : // Note In some circumstances this might be temporary memory
564 0 : // corruption that can be recovered by discarding the batch and
565 0 : // trying again. In other cases, the batch repr might've been
566 0 : // already persisted elsewhere, and we'll loop continuously trying
567 0 : // to commit the same corrupted batch. The caller is responsible for
568 0 : // distinguishing.
569 0 : return errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
570 : }
571 2 : b.memTableSize += memTableEntrySize(len(key), len(value))
572 : }
573 2 : return nil
574 : }
575 :
576 : // Apply the operations contained in the batch to the receiver batch.
577 : //
578 : // It is safe to modify the contents of the arguments after Apply returns.
579 : //
580 : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
581 2 : func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
582 2 : if b.ingestedSSTBatch {
583 0 : panic("pebble: invalid batch application")
584 : }
585 2 : if len(batch.data) == 0 {
586 0 : return nil
587 0 : }
588 2 : if len(batch.data) < batchrepr.HeaderLen {
589 0 : return ErrInvalidBatch
590 0 : }
591 :
592 2 : offset := len(b.data)
593 2 : if offset == 0 {
594 1 : b.init(offset)
595 1 : offset = batchrepr.HeaderLen
596 1 : }
597 2 : b.data = append(b.data, batch.data[batchrepr.HeaderLen:]...)
598 2 :
599 2 : b.setCount(b.Count() + batch.Count())
600 2 :
601 2 : if b.db != nil || b.index != nil {
602 2 : // Only iterate over the new entries if we need to track memTableSize or in
603 2 : // order to update the index.
604 2 : for iter := batchrepr.Reader(b.data[offset:]); len(iter) > 0; {
605 2 : offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
606 2 : kind, key, value, ok, err := iter.Next()
607 2 : if !ok {
608 0 : if err != nil {
609 0 : return err
610 0 : }
611 0 : break
612 : }
613 2 : switch kind {
614 2 : case InternalKeyKindRangeDelete:
615 2 : b.countRangeDels++
616 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
617 1 : b.countRangeKeys++
618 0 : case InternalKeyKindIngestSST, InternalKeyKindExcise:
619 0 : panic("pebble: invalid key kind for batch")
620 1 : case InternalKeyKindLogData:
621 1 : // LogData does not contribute to memtable size.
622 1 : continue
623 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge,
624 1 : InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
625 : // fallthrough
626 0 : default:
627 0 : // Note In some circumstances this might be temporary memory
628 0 : // corruption that can be recovered by discarding the batch and
629 0 : // trying again. In other cases, the batch repr might've been
630 0 : // already persisted elsewhere, and we'll loop continuously
631 0 : // trying to commit the same corrupted batch. The caller is
632 0 : // responsible for distinguishing.
633 0 : return errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
634 : }
635 2 : if b.index != nil {
636 2 : var err error
637 2 : switch kind {
638 2 : case InternalKeyKindRangeDelete:
639 2 : b.tombstones = nil
640 2 : b.tombstonesSeqNum = 0
641 2 : if b.rangeDelIndex == nil {
642 2 : b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
643 2 : }
644 2 : err = b.rangeDelIndex.Add(uint32(offset))
645 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
646 1 : b.rangeKeys = nil
647 1 : b.rangeKeysSeqNum = 0
648 1 : if b.rangeKeyIndex == nil {
649 1 : b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
650 1 : }
651 1 : err = b.rangeKeyIndex.Add(uint32(offset))
652 1 : default:
653 1 : err = b.index.Add(uint32(offset))
654 : }
655 2 : if err != nil {
656 0 : return err
657 0 : }
658 : }
659 2 : b.memTableSize += memTableEntrySize(len(key), len(value))
660 : }
661 : }
662 2 : return nil
663 : }
664 :
665 : // Get gets the value for the given key. It returns ErrNotFound if the Batch
666 : // does not contain the key.
667 : //
668 : // The caller should not modify the contents of the returned slice, but it is
669 : // safe to modify the contents of the argument after Get returns. The returned
670 : // slice will remain valid until the returned Closer is closed. On success, the
671 : // caller MUST call closer.Close() or a memory leak will occur.
672 2 : func (b *Batch) Get(key []byte) ([]byte, io.Closer, error) {
673 2 : if b.index == nil {
674 0 : return nil, nil, ErrNotIndexed
675 0 : }
676 2 : return b.db.getInternal(key, b, nil /* snapshot */)
677 : }
678 :
679 2 : func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind InternalKeyKind) {
680 2 : if b.committing {
681 0 : panic("pebble: batch already committing")
682 : }
683 2 : if len(b.data) == 0 {
684 2 : b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchrepr.HeaderLen)
685 2 : }
686 2 : b.count++
687 2 : b.memTableSize += memTableEntrySize(keyLen, valueLen)
688 2 :
689 2 : pos := len(b.data)
690 2 : b.deferredOp.offset = uint32(pos)
691 2 : b.grow(1 + 2*maxVarintLen32 + keyLen + valueLen)
692 2 : b.data[pos] = byte(kind)
693 2 : pos++
694 2 :
695 2 : {
696 2 : // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
697 2 : // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
698 2 : // versions show this to not be a performance win.
699 2 : x := uint32(keyLen)
700 2 : for x >= 0x80 {
701 1 : b.data[pos] = byte(x) | 0x80
702 1 : x >>= 7
703 1 : pos++
704 1 : }
705 2 : b.data[pos] = byte(x)
706 2 : pos++
707 : }
708 :
709 2 : b.deferredOp.Key = b.data[pos : pos+keyLen]
710 2 : pos += keyLen
711 2 :
712 2 : {
713 2 : // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
714 2 : // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
715 2 : // versions show this to not be a performance win.
716 2 : x := uint32(valueLen)
717 2 : for x >= 0x80 {
718 1 : b.data[pos] = byte(x) | 0x80
719 1 : x >>= 7
720 1 : pos++
721 1 : }
722 2 : b.data[pos] = byte(x)
723 2 : pos++
724 : }
725 :
726 2 : b.deferredOp.Value = b.data[pos : pos+valueLen]
727 2 : // Shrink data since varints may be shorter than the upper bound.
728 2 : b.data = b.data[:pos+valueLen]
729 : }
730 :
731 2 : func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) {
732 2 : if b.committing {
733 0 : panic("pebble: batch already committing")
734 : }
735 2 : if len(b.data) == 0 {
736 2 : b.init(keyLen + binary.MaxVarintLen64 + batchrepr.HeaderLen)
737 2 : }
738 2 : b.count++
739 2 : b.memTableSize += memTableEntrySize(keyLen, 0)
740 2 :
741 2 : pos := len(b.data)
742 2 : b.deferredOp.offset = uint32(pos)
743 2 : b.grow(1 + maxVarintLen32 + keyLen)
744 2 : b.data[pos] = byte(kind)
745 2 : pos++
746 2 :
747 2 : {
748 2 : // TODO(peter): Manually inlined version binary.PutUvarint(). Remove if
749 2 : // go1.13 or future versions show this to not be a performance win. See
750 2 : // BenchmarkBatchSet.
751 2 : x := uint32(keyLen)
752 2 : for x >= 0x80 {
753 0 : b.data[pos] = byte(x) | 0x80
754 0 : x >>= 7
755 0 : pos++
756 0 : }
757 2 : b.data[pos] = byte(x)
758 2 : pos++
759 : }
760 :
761 2 : b.deferredOp.Key = b.data[pos : pos+keyLen]
762 2 : b.deferredOp.Value = nil
763 2 :
764 2 : // Shrink data since varint may be shorter than the upper bound.
765 2 : b.data = b.data[:pos+keyLen]
766 : }
767 :
768 : // AddInternalKey allows the caller to add an internal key of point key or range
769 : // key kinds (but not RangeDelete) to a batch. Passing in an internal key of
770 : // kind RangeDelete will result in a panic. Note that the seqnum in the internal
771 : // key is effectively ignored, even though the Kind is preserved. This is
772 : // because the batch format does not allow for a per-key seqnum to be specified,
773 : // only a batch-wide one.
774 : //
775 : // Note that non-indexed keys (IngestKeyKind{LogData,IngestSST}) are not
776 : // supported with this method as they require specialized logic.
777 2 : func (b *Batch) AddInternalKey(key *base.InternalKey, value []byte, _ *WriteOptions) error {
778 2 : keyLen := len(key.UserKey)
779 2 : hasValue := false
780 2 : switch kind := key.Kind(); kind {
781 0 : case InternalKeyKindRangeDelete:
782 0 : panic("unexpected range delete in AddInternalKey")
783 1 : case InternalKeyKindSingleDelete, InternalKeyKindDelete:
784 1 : b.prepareDeferredKeyRecord(keyLen, kind)
785 1 : b.deferredOp.index = b.index
786 2 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
787 2 : b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
788 2 : hasValue = true
789 2 : b.incrementRangeKeysCount()
790 1 : default:
791 1 : b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
792 1 : hasValue = true
793 1 : b.deferredOp.index = b.index
794 : }
795 2 : copy(b.deferredOp.Key, key.UserKey)
796 2 : if hasValue {
797 2 : copy(b.deferredOp.Value, value)
798 2 : }
799 :
800 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
801 : // in go1.13 will remove the need for this.
802 2 : if b.index != nil {
803 0 : if err := b.index.Add(b.deferredOp.offset); err != nil {
804 0 : return err
805 0 : }
806 : }
807 2 : return nil
808 : }
809 :
810 : // Set adds an action to the batch that sets the key to map to the value.
811 : //
812 : // It is safe to modify the contents of the arguments after Set returns.
813 2 : func (b *Batch) Set(key, value []byte, _ *WriteOptions) error {
814 2 : deferredOp := b.SetDeferred(len(key), len(value))
815 2 : copy(deferredOp.Key, key)
816 2 : copy(deferredOp.Value, value)
817 2 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
818 2 : // in go1.13 will remove the need for this.
819 2 : if b.index != nil {
820 2 : if err := b.index.Add(deferredOp.offset); err != nil {
821 0 : return err
822 0 : }
823 : }
824 2 : return nil
825 : }
826 :
827 : // SetDeferred is similar to Set in that it adds a set operation to the batch,
828 : // except it only takes in key/value lengths instead of complete slices,
829 : // letting the caller encode into those objects and then call Finish() on the
830 : // returned object.
831 2 : func (b *Batch) SetDeferred(keyLen, valueLen int) *DeferredBatchOp {
832 2 : b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindSet)
833 2 : b.deferredOp.index = b.index
834 2 : return &b.deferredOp
835 2 : }
836 :
837 : // Merge adds an action to the batch that merges the value at key with the new
838 : // value. The details of the merge are dependent upon the configured merge
839 : // operator.
840 : //
841 : // It is safe to modify the contents of the arguments after Merge returns.
842 2 : func (b *Batch) Merge(key, value []byte, _ *WriteOptions) error {
843 2 : deferredOp := b.MergeDeferred(len(key), len(value))
844 2 : copy(deferredOp.Key, key)
845 2 : copy(deferredOp.Value, value)
846 2 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
847 2 : // in go1.13 will remove the need for this.
848 2 : if b.index != nil {
849 2 : if err := b.index.Add(deferredOp.offset); err != nil {
850 0 : return err
851 0 : }
852 : }
853 2 : return nil
854 : }
855 :
856 : // MergeDeferred is similar to Merge in that it adds a merge operation to the
857 : // batch, except it only takes in key/value lengths instead of complete slices,
858 : // letting the caller encode into those objects and then call Finish() on the
859 : // returned object.
860 2 : func (b *Batch) MergeDeferred(keyLen, valueLen int) *DeferredBatchOp {
861 2 : b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindMerge)
862 2 : b.deferredOp.index = b.index
863 2 : return &b.deferredOp
864 2 : }
865 :
866 : // Delete adds an action to the batch that deletes the entry for key.
867 : //
868 : // It is safe to modify the contents of the arguments after Delete returns.
869 2 : func (b *Batch) Delete(key []byte, _ *WriteOptions) error {
870 2 : deferredOp := b.DeleteDeferred(len(key))
871 2 : copy(deferredOp.Key, key)
872 2 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
873 2 : // in go1.13 will remove the need for this.
874 2 : if b.index != nil {
875 2 : if err := b.index.Add(deferredOp.offset); err != nil {
876 0 : return err
877 0 : }
878 : }
879 2 : return nil
880 : }
881 :
882 : // DeleteDeferred is similar to Delete in that it adds a delete operation to
883 : // the batch, except it only takes in key/value lengths instead of complete
884 : // slices, letting the caller encode into those objects and then call Finish()
885 : // on the returned object.
886 2 : func (b *Batch) DeleteDeferred(keyLen int) *DeferredBatchOp {
887 2 : b.prepareDeferredKeyRecord(keyLen, InternalKeyKindDelete)
888 2 : b.deferredOp.index = b.index
889 2 : return &b.deferredOp
890 2 : }
891 :
892 : // DeleteSized behaves identically to Delete, but takes an additional
893 : // argument indicating the size of the value being deleted. DeleteSized
894 : // should be preferred when the caller has the expectation that there exists
895 : // a single internal KV pair for the key (eg, the key has not been
896 : // overwritten recently), and the caller knows the size of its value.
897 : //
898 : // DeleteSized will record the value size within the tombstone and use it to
899 : // inform compaction-picking heuristics which strive to reduce space
900 : // amplification in the LSM. This "calling your shot" mechanic allows the
901 : // storage engine to more accurately estimate and reduce space amplification.
902 : //
903 : // It is safe to modify the contents of the arguments after DeleteSized
904 : // returns.
905 2 : func (b *Batch) DeleteSized(key []byte, deletedValueSize uint32, _ *WriteOptions) error {
906 2 : deferredOp := b.DeleteSizedDeferred(len(key), deletedValueSize)
907 2 : copy(b.deferredOp.Key, key)
908 2 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Check if in a
909 2 : // later Go release this is unnecessary.
910 2 : if b.index != nil {
911 2 : if err := b.index.Add(deferredOp.offset); err != nil {
912 0 : return err
913 0 : }
914 : }
915 2 : return nil
916 : }
917 :
918 : // DeleteSizedDeferred is similar to DeleteSized in that it adds a sized delete
919 : // operation to the batch, except it only takes in key length instead of a
920 : // complete key slice, letting the caller encode into the DeferredBatchOp.Key
921 : // slice and then call Finish() on the returned object.
922 2 : func (b *Batch) DeleteSizedDeferred(keyLen int, deletedValueSize uint32) *DeferredBatchOp {
923 2 : if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
924 2 : b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
925 2 : }
926 :
927 : // Encode the sum of the key length and the value in the value.
928 2 : v := uint64(deletedValueSize) + uint64(keyLen)
929 2 :
930 2 : // Encode `v` as a varint.
931 2 : var buf [binary.MaxVarintLen64]byte
932 2 : n := 0
933 2 : {
934 2 : x := v
935 2 : for x >= 0x80 {
936 1 : buf[n] = byte(x) | 0x80
937 1 : x >>= 7
938 1 : n++
939 1 : }
940 2 : buf[n] = byte(x)
941 2 : n++
942 : }
943 :
944 : // NB: In batch entries and sstable entries, values are stored as
945 : // varstrings. Here, the value is itself a simple varint. This results in an
946 : // unnecessary double layer of encoding:
947 : // varint(n) varint(deletedValueSize)
948 : // The first varint will always be 1-byte, since a varint-encoded uint64
949 : // will never exceed 128 bytes. This unnecessary extra byte and wrapping is
950 : // preserved to avoid special casing across the database, and in particular
951 : // in sstable block decoding which is performance sensitive.
952 2 : b.prepareDeferredKeyValueRecord(keyLen, n, InternalKeyKindDeleteSized)
953 2 : b.deferredOp.index = b.index
954 2 : copy(b.deferredOp.Value, buf[:n])
955 2 : return &b.deferredOp
956 : }
957 :
958 : // SingleDelete adds an action to the batch that single deletes the entry for key.
959 : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
960 : //
961 : // It is safe to modify the contents of the arguments after SingleDelete returns.
962 2 : func (b *Batch) SingleDelete(key []byte, _ *WriteOptions) error {
963 2 : deferredOp := b.SingleDeleteDeferred(len(key))
964 2 : copy(deferredOp.Key, key)
965 2 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
966 2 : // in go1.13 will remove the need for this.
967 2 : if b.index != nil {
968 2 : if err := b.index.Add(deferredOp.offset); err != nil {
969 0 : return err
970 0 : }
971 : }
972 2 : return nil
973 : }
974 :
975 : // SingleDeleteDeferred is similar to SingleDelete in that it adds a single delete
976 : // operation to the batch, except it only takes in key/value lengths instead of
977 : // complete slices, letting the caller encode into those objects and then call
978 : // Finish() on the returned object.
979 2 : func (b *Batch) SingleDeleteDeferred(keyLen int) *DeferredBatchOp {
980 2 : b.prepareDeferredKeyRecord(keyLen, InternalKeyKindSingleDelete)
981 2 : b.deferredOp.index = b.index
982 2 : return &b.deferredOp
983 2 : }
984 :
985 : // DeleteRange deletes all of the point keys (and values) in the range
986 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
987 : // delete overlapping range keys (eg, keys set via RangeKeySet).
988 : //
989 : // It is safe to modify the contents of the arguments after DeleteRange
990 : // returns.
991 2 : func (b *Batch) DeleteRange(start, end []byte, _ *WriteOptions) error {
992 2 : deferredOp := b.DeleteRangeDeferred(len(start), len(end))
993 2 : copy(deferredOp.Key, start)
994 2 : copy(deferredOp.Value, end)
995 2 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
996 2 : // in go1.13 will remove the need for this.
997 2 : if deferredOp.index != nil {
998 2 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
999 0 : return err
1000 0 : }
1001 : }
1002 2 : return nil
1003 : }
1004 :
1005 : // DeleteRangeDeferred is similar to DeleteRange in that it adds a delete range
1006 : // operation to the batch, except it only takes in key lengths instead of
1007 : // complete slices, letting the caller encode into those objects and then call
1008 : // Finish() on the returned object. Note that DeferredBatchOp.Key should be
1009 : // populated with the start key, and DeferredBatchOp.Value should be populated
1010 : // with the end key.
1011 2 : func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp {
1012 2 : b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeDelete)
1013 2 : b.countRangeDels++
1014 2 : if b.index != nil {
1015 2 : b.tombstones = nil
1016 2 : b.tombstonesSeqNum = 0
1017 2 : // Range deletions are rare, so we lazily allocate the index for them.
1018 2 : if b.rangeDelIndex == nil {
1019 2 : b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
1020 2 : }
1021 2 : b.deferredOp.index = b.rangeDelIndex
1022 : }
1023 2 : return &b.deferredOp
1024 : }
1025 :
1026 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
1027 : // timestamp suffix to value. The suffix is optional. If any portion of the key
1028 : // range [start, end) is already set by a range key with the same suffix value,
1029 : // RangeKeySet overrides it.
1030 : //
1031 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
1032 2 : func (b *Batch) RangeKeySet(start, end, suffix, value []byte, _ *WriteOptions) error {
1033 2 : if invariants.Enabled && b.db != nil {
1034 2 : // RangeKeySet is only supported on prefix keys.
1035 2 : if b.db.opts.Comparer.Split(start) != len(start) {
1036 0 : panic("RangeKeySet called with suffixed start key")
1037 : }
1038 2 : if b.db.opts.Comparer.Split(end) != len(end) {
1039 0 : panic("RangeKeySet called with suffixed end key")
1040 : }
1041 : }
1042 2 : suffixValues := [1]rangekey.SuffixValue{{Suffix: suffix, Value: value}}
1043 2 : internalValueLen := rangekey.EncodedSetValueLen(end, suffixValues[:])
1044 2 :
1045 2 : deferredOp := b.rangeKeySetDeferred(len(start), internalValueLen)
1046 2 : copy(deferredOp.Key, start)
1047 2 : n := rangekey.EncodeSetValue(deferredOp.Value, end, suffixValues[:])
1048 2 : if n != internalValueLen {
1049 0 : panic("unexpected internal value length mismatch")
1050 : }
1051 :
1052 : // Manually inline DeferredBatchOp.Finish().
1053 2 : if deferredOp.index != nil {
1054 2 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
1055 0 : return err
1056 0 : }
1057 : }
1058 2 : return nil
1059 : }
1060 :
1061 2 : func (b *Batch) rangeKeySetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
1062 2 : b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeySet)
1063 2 : b.incrementRangeKeysCount()
1064 2 : return &b.deferredOp
1065 2 : }
1066 :
1067 2 : func (b *Batch) incrementRangeKeysCount() {
1068 2 : b.countRangeKeys++
1069 2 : if b.index != nil {
1070 2 : b.rangeKeys = nil
1071 2 : b.rangeKeysSeqNum = 0
1072 2 : // Range keys are rare, so we lazily allocate the index for them.
1073 2 : if b.rangeKeyIndex == nil {
1074 2 : b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
1075 2 : }
1076 2 : b.deferredOp.index = b.rangeKeyIndex
1077 : }
1078 : }
1079 :
1080 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
1081 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
1082 : // range key. RangeKeyUnset only removes portions of range keys that fall within
1083 : // the [start, end) key span, and only range keys with suffixes that exactly
1084 : // match the unset suffix.
1085 : //
1086 : // It is safe to modify the contents of the arguments after RangeKeyUnset
1087 : // returns.
1088 2 : func (b *Batch) RangeKeyUnset(start, end, suffix []byte, _ *WriteOptions) error {
1089 2 : if invariants.Enabled && b.db != nil {
1090 2 : // RangeKeyUnset is only supported on prefix keys.
1091 2 : if b.db.opts.Comparer.Split(start) != len(start) {
1092 0 : panic("RangeKeyUnset called with suffixed start key")
1093 : }
1094 2 : if b.db.opts.Comparer.Split(end) != len(end) {
1095 0 : panic("RangeKeyUnset called with suffixed end key")
1096 : }
1097 : }
1098 2 : suffixes := [1][]byte{suffix}
1099 2 : internalValueLen := rangekey.EncodedUnsetValueLen(end, suffixes[:])
1100 2 :
1101 2 : deferredOp := b.rangeKeyUnsetDeferred(len(start), internalValueLen)
1102 2 : copy(deferredOp.Key, start)
1103 2 : n := rangekey.EncodeUnsetValue(deferredOp.Value, end, suffixes[:])
1104 2 : if n != internalValueLen {
1105 0 : panic("unexpected internal value length mismatch")
1106 : }
1107 :
1108 : // Manually inline DeferredBatchOp.Finish()
1109 2 : if deferredOp.index != nil {
1110 2 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
1111 0 : return err
1112 0 : }
1113 : }
1114 2 : return nil
1115 : }
1116 :
1117 2 : func (b *Batch) rangeKeyUnsetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
1118 2 : b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeyUnset)
1119 2 : b.incrementRangeKeysCount()
1120 2 : return &b.deferredOp
1121 2 : }
1122 :
1123 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
1124 : // (inclusive on start, exclusive on end). It does not delete point keys (for
1125 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
1126 : // bounds, including those with or without suffixes.
1127 : //
1128 : // It is safe to modify the contents of the arguments after RangeKeyDelete
1129 : // returns.
1130 2 : func (b *Batch) RangeKeyDelete(start, end []byte, _ *WriteOptions) error {
1131 2 : if invariants.Enabled && b.db != nil {
1132 2 : // RangeKeyDelete is only supported on prefix keys.
1133 2 : if b.db.opts.Comparer.Split(start) != len(start) {
1134 0 : panic("RangeKeyDelete called with suffixed start key")
1135 : }
1136 2 : if b.db.opts.Comparer.Split(end) != len(end) {
1137 0 : panic("RangeKeyDelete called with suffixed end key")
1138 : }
1139 : }
1140 2 : deferredOp := b.RangeKeyDeleteDeferred(len(start), len(end))
1141 2 : copy(deferredOp.Key, start)
1142 2 : copy(deferredOp.Value, end)
1143 2 : // Manually inline DeferredBatchOp.Finish().
1144 2 : if deferredOp.index != nil {
1145 2 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
1146 0 : return err
1147 0 : }
1148 : }
1149 2 : return nil
1150 : }
1151 :
1152 : // RangeKeyDeleteDeferred is similar to RangeKeyDelete in that it adds an
1153 : // operation to delete range keys to the batch, except it only takes in key
1154 : // lengths instead of complete slices, letting the caller encode into those
1155 : // objects and then call Finish() on the returned object. Note that
1156 : // DeferredBatchOp.Key should be populated with the start key, and
1157 : // DeferredBatchOp.Value should be populated with the end key.
1158 2 : func (b *Batch) RangeKeyDeleteDeferred(startLen, endLen int) *DeferredBatchOp {
1159 2 : b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeKeyDelete)
1160 2 : b.incrementRangeKeysCount()
1161 2 : return &b.deferredOp
1162 2 : }
1163 :
1164 : // LogData adds the specified to the batch. The data will be written to the
1165 : // WAL, but not added to memtables or sstables. Log data is never indexed,
1166 : // which makes it useful for testing WAL performance.
1167 : //
1168 : // It is safe to modify the contents of the argument after LogData returns.
1169 2 : func (b *Batch) LogData(data []byte, _ *WriteOptions) error {
1170 2 : origCount, origMemTableSize := b.count, b.memTableSize
1171 2 : b.prepareDeferredKeyRecord(len(data), InternalKeyKindLogData)
1172 2 : copy(b.deferredOp.Key, data)
1173 2 : // Since LogData only writes to the WAL and does not affect the memtable, we
1174 2 : // restore b.count and b.memTableSize to their origin values. Note that
1175 2 : // Batch.count only refers to records that are added to the memtable.
1176 2 : b.count, b.memTableSize = origCount, origMemTableSize
1177 2 : return nil
1178 2 : }
1179 :
1180 : // IngestSST adds the FileNum for an sstable to the batch. The data will only be
1181 : // written to the WAL (not added to memtables or sstables).
1182 2 : func (b *Batch) ingestSST(fileNum base.FileNum) {
1183 2 : if b.Empty() {
1184 2 : b.ingestedSSTBatch = true
1185 2 : } else if !b.ingestedSSTBatch {
1186 0 : // Batch contains other key kinds.
1187 0 : panic("pebble: invalid call to ingestSST")
1188 : }
1189 :
1190 2 : origMemTableSize := b.memTableSize
1191 2 : var buf [binary.MaxVarintLen64]byte
1192 2 : length := binary.PutUvarint(buf[:], uint64(fileNum))
1193 2 : b.prepareDeferredKeyRecord(length, InternalKeyKindIngestSST)
1194 2 : copy(b.deferredOp.Key, buf[:length])
1195 2 : // Since IngestSST writes only to the WAL and does not affect the memtable,
1196 2 : // we restore b.memTableSize to its original value. Note that Batch.count
1197 2 : // is not reset because for the InternalKeyKindIngestSST the count is the
1198 2 : // number of sstable paths which have been added to the batch.
1199 2 : b.memTableSize = origMemTableSize
1200 2 : b.minimumFormatMajorVersion = FormatFlushableIngest
1201 : }
1202 :
1203 : // Excise adds the excise span for a flushable ingest containing an excise. The data
1204 : // will only be written to the WAL (not added to memtables or sstables).
1205 2 : func (b *Batch) excise(start, end []byte) {
1206 2 : if b.Empty() {
1207 2 : b.ingestedSSTBatch = true
1208 2 : } else if !b.ingestedSSTBatch {
1209 0 : // Batch contains other key kinds.
1210 0 : panic("pebble: invalid call to excise")
1211 : }
1212 :
1213 2 : origMemTableSize := b.memTableSize
1214 2 : b.prepareDeferredKeyValueRecord(len(start), len(end), InternalKeyKindExcise)
1215 2 : copy(b.deferredOp.Key, start)
1216 2 : copy(b.deferredOp.Value, end)
1217 2 : // Since excise writes only to the WAL and does not affect the memtable,
1218 2 : // we restore b.memTableSize to its original value. Note that Batch.count
1219 2 : // is not reset because for the InternalKeyKindIngestSST/Excise the count
1220 2 : // is the number of sstable paths which have been added to the batch.
1221 2 : b.memTableSize = origMemTableSize
1222 2 : b.minimumFormatMajorVersion = FormatFlushableIngestExcises
1223 : }
1224 :
1225 : // Empty returns true if the batch is empty, and false otherwise.
1226 2 : func (b *Batch) Empty() bool {
1227 2 : return batchrepr.IsEmpty(b.data)
1228 2 : }
1229 :
1230 : // Len returns the current size of the batch in bytes.
1231 1 : func (b *Batch) Len() int {
1232 1 : return max(batchrepr.HeaderLen, len(b.data))
1233 1 : }
1234 :
1235 : // Repr returns the underlying batch representation. It is not safe to modify
1236 : // the contents. Reset() will not change the contents of the returned value,
1237 : // though any other mutation operation may do so.
1238 2 : func (b *Batch) Repr() []byte {
1239 2 : if len(b.data) == 0 {
1240 1 : b.init(batchrepr.HeaderLen)
1241 1 : }
1242 2 : batchrepr.SetCount(b.data, b.Count())
1243 2 : return b.data
1244 : }
1245 :
1246 : // SetRepr sets the underlying batch representation. The batch takes ownership
1247 : // of the supplied slice. It is not safe to modify it afterwards until the
1248 : // Batch is no longer in use.
1249 : //
1250 : // SetRepr may return ErrInvalidBatch if the supplied slice fails to decode in
1251 : // any way. It will not return an error in any other circumstance.
1252 2 : func (b *Batch) SetRepr(data []byte) error {
1253 2 : h, ok := batchrepr.ReadHeader(data)
1254 2 : if !ok {
1255 0 : return ErrInvalidBatch
1256 0 : }
1257 2 : b.data = data
1258 2 : b.count = uint64(h.Count)
1259 2 : var err error
1260 2 : if b.db != nil {
1261 2 : // Only track memTableSize for batches that will be committed to the DB.
1262 2 : err = b.refreshMemTableSize()
1263 2 : }
1264 2 : return err
1265 : }
1266 :
1267 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1268 : // return false). The iterator can be positioned via a call to SeekGE,
1269 : // SeekPrefixGE, SeekLT, First or Last. Only indexed batches support iterators.
1270 : //
1271 : // The returned Iterator observes all of the Batch's existing mutations, but no
1272 : // later mutations. Its view can be refreshed via RefreshBatchSnapshot or
1273 : // SetOptions().
1274 2 : func (b *Batch) NewIter(o *IterOptions) (*Iterator, error) {
1275 2 : return b.NewIterWithContext(context.Background(), o)
1276 2 : }
1277 :
1278 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1279 : // tracing.
1280 2 : func (b *Batch) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1281 2 : if b.index == nil {
1282 0 : return nil, ErrNotIndexed
1283 0 : }
1284 2 : return b.db.newIter(ctx, b, newIterOpts{}, o), nil
1285 : }
1286 :
1287 : // NewBatchOnlyIter constructs an iterator that only reads the contents of the
1288 : // batch, and does not overlay the batch mutations on top of the DB state.
1289 : //
1290 : // The returned Iterator observes all of the Batch's existing mutations, but
1291 : // no later mutations. Its view can be refreshed via RefreshBatchSnapshot or
1292 : // SetOptions().
1293 1 : func (b *Batch) NewBatchOnlyIter(ctx context.Context, o *IterOptions) (*Iterator, error) {
1294 1 : if b.index == nil {
1295 0 : return nil, ErrNotIndexed
1296 0 : }
1297 1 : return b.db.newIter(ctx, b, newIterOpts{batch: batchIterOpts{batchOnly: true}}, o), nil
1298 : }
1299 :
1300 : // newInternalIter creates a new internalIterator that iterates over the
1301 : // contents of the batch.
1302 2 : func (b *Batch) newInternalIter(o *IterOptions) *batchIter {
1303 2 : iter := &batchIter{}
1304 2 : b.initInternalIter(o, iter)
1305 2 : return iter
1306 2 : }
1307 :
1308 2 : func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) {
1309 2 : *iter = batchIter{
1310 2 : batch: b,
1311 2 : iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()),
1312 2 : // NB: We explicitly do not propagate the batch snapshot to the point
1313 2 : // key iterator. Filtering point keys within the batch iterator can
1314 2 : // cause pathological behavior where a batch iterator advances
1315 2 : // significantly farther than necessary filtering many batch keys that
1316 2 : // are not visible at the batch sequence number. Instead, the merging
1317 2 : // iterator enforces bounds.
1318 2 : //
1319 2 : // For example, consider an engine that contains the committed keys
1320 2 : // 'bar' and 'bax', with no keys between them. Consider a batch
1321 2 : // containing keys 1,000 keys within the range [a,z]. All of the
1322 2 : // batch keys were added to the batch after the iterator was
1323 2 : // constructed, so they are not visible to the iterator. A call to
1324 2 : // SeekGE('bax') would seek the LSM iterators and discover the key
1325 2 : // 'bax'. It would also seek the batch iterator, landing on the key
1326 2 : // 'baz' but discover it that it's not visible. The batch iterator would
1327 2 : // next through the rest of the batch's keys, only to discover there are
1328 2 : // no visible keys greater than or equal to 'bax'.
1329 2 : //
1330 2 : // Filtering these batch points within the merging iterator ensures that
1331 2 : // the batch iterator never needs to iterate beyond 'baz', because it
1332 2 : // already found a smaller, visible key 'bax'.
1333 2 : snapshot: base.SeqNumMax,
1334 2 : }
1335 2 : }
1336 :
1337 2 : func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
1338 2 : // Construct an iterator even if rangeDelIndex is nil, because it is allowed
1339 2 : // to refresh later, so we need the container to exist.
1340 2 : iter := new(keyspan.Iter)
1341 2 : b.initRangeDelIter(o, iter, batchSnapshot)
1342 2 : return iter
1343 2 : }
1344 :
1345 2 : func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
1346 2 : if b.rangeDelIndex == nil {
1347 2 : iter.Init(b.comparer.Compare, nil)
1348 2 : return
1349 2 : }
1350 :
1351 : // Fragment the range tombstones the first time a range deletion iterator is
1352 : // requested. The cached tombstones are invalidated if another range
1353 : // deletion tombstone is added to the batch. This cache is only guaranteed
1354 : // to be correct if we're opening an iterator to read at a batch sequence
1355 : // number at least as high as tombstonesSeqNum. The cache is guaranteed to
1356 : // include all tombstones up to tombstonesSeqNum, and if any additional
1357 : // tombstones were added after that sequence number the cache would've been
1358 : // cleared.
1359 2 : nextSeqNum := b.nextSeqNum()
1360 2 : if b.tombstones != nil && b.tombstonesSeqNum <= batchSnapshot {
1361 1 : iter.Init(b.comparer.Compare, b.tombstones)
1362 1 : return
1363 1 : }
1364 :
1365 2 : tombstones := make([]keyspan.Span, 0, b.countRangeDels)
1366 2 : frag := &keyspan.Fragmenter{
1367 2 : Cmp: b.comparer.Compare,
1368 2 : Format: b.comparer.FormatKey,
1369 2 : Emit: func(s keyspan.Span) {
1370 2 : tombstones = append(tombstones, s)
1371 2 : },
1372 : }
1373 2 : it := &batchIter{
1374 2 : batch: b,
1375 2 : iter: b.rangeDelIndex.NewIter(nil, nil),
1376 2 : snapshot: batchSnapshot,
1377 2 : }
1378 2 : fragmentRangeDels(frag, it, int(b.countRangeDels))
1379 2 : iter.Init(b.comparer.Compare, tombstones)
1380 2 :
1381 2 : // If we just read all the tombstones in the batch (eg, batchSnapshot was
1382 2 : // set to b.nextSeqNum()), then cache the tombstones so that a subsequent
1383 2 : // call to initRangeDelIter may use them without refragmenting.
1384 2 : if nextSeqNum == batchSnapshot {
1385 2 : b.tombstones = tombstones
1386 2 : b.tombstonesSeqNum = nextSeqNum
1387 2 : }
1388 : }
1389 :
1390 2 : func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int) {
1391 2 : // The memory management here is a bit subtle. The keys and values returned
1392 2 : // by the iterator are slices in Batch.data. Thus the fragmented tombstones
1393 2 : // are slices within Batch.data. If additional entries are added to the
1394 2 : // Batch, Batch.data may be reallocated. The references in the fragmented
1395 2 : // tombstones will remain valid, pointing into the old Batch.data. GC for
1396 2 : // the win.
1397 2 :
1398 2 : // Use a single []keyspan.Key buffer to avoid allocating many
1399 2 : // individual []keyspan.Key slices with a single element each.
1400 2 : keyBuf := make([]keyspan.Key, 0, count)
1401 2 : for kv := it.First(); kv != nil; kv = it.Next() {
1402 2 : s := rangedel.Decode(kv.K, kv.InPlaceValue(), keyBuf)
1403 2 : keyBuf = s.Keys[len(s.Keys):]
1404 2 :
1405 2 : // Set a fixed capacity to avoid accidental overwriting.
1406 2 : s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
1407 2 : frag.Add(s)
1408 2 : }
1409 2 : frag.Finish()
1410 : }
1411 :
1412 2 : func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
1413 2 : // Construct an iterator even if rangeKeyIndex is nil, because it is allowed
1414 2 : // to refresh later, so we need the container to exist.
1415 2 : iter := new(keyspan.Iter)
1416 2 : b.initRangeKeyIter(o, iter, batchSnapshot)
1417 2 : return iter
1418 2 : }
1419 :
1420 2 : func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
1421 2 : if b.rangeKeyIndex == nil {
1422 2 : iter.Init(b.comparer.Compare, nil)
1423 2 : return
1424 2 : }
1425 :
1426 : // Fragment the range keys the first time a range key iterator is requested.
1427 : // The cached spans are invalidated if another range key is added to the
1428 : // batch. This cache is only guaranteed to be correct if we're opening an
1429 : // iterator to read at a batch sequence number at least as high as
1430 : // rangeKeysSeqNum. The cache is guaranteed to include all range keys up to
1431 : // rangeKeysSeqNum, and if any additional range keys were added after that
1432 : // sequence number the cache would've been cleared.
1433 2 : nextSeqNum := b.nextSeqNum()
1434 2 : if b.rangeKeys != nil && b.rangeKeysSeqNum <= batchSnapshot {
1435 2 : iter.Init(b.comparer.Compare, b.rangeKeys)
1436 2 : return
1437 2 : }
1438 :
1439 2 : rangeKeys := make([]keyspan.Span, 0, b.countRangeKeys)
1440 2 : frag := &keyspan.Fragmenter{
1441 2 : Cmp: b.comparer.Compare,
1442 2 : Format: b.comparer.FormatKey,
1443 2 : Emit: func(s keyspan.Span) {
1444 2 : rangeKeys = append(rangeKeys, s)
1445 2 : },
1446 : }
1447 2 : it := &batchIter{
1448 2 : batch: b,
1449 2 : iter: b.rangeKeyIndex.NewIter(nil, nil),
1450 2 : snapshot: batchSnapshot,
1451 2 : }
1452 2 : fragmentRangeKeys(frag, it, int(b.countRangeKeys))
1453 2 : iter.Init(b.comparer.Compare, rangeKeys)
1454 2 :
1455 2 : // If we just read all the range keys in the batch (eg, batchSnapshot was
1456 2 : // set to b.nextSeqNum()), then cache the range keys so that a subsequent
1457 2 : // call to initRangeKeyIter may use them without refragmenting.
1458 2 : if nextSeqNum == batchSnapshot {
1459 2 : b.rangeKeys = rangeKeys
1460 2 : b.rangeKeysSeqNum = nextSeqNum
1461 2 : }
1462 : }
1463 :
1464 2 : func fragmentRangeKeys(frag *keyspan.Fragmenter, it internalIterator, count int) error {
1465 2 : // The memory management here is a bit subtle. The keys and values
1466 2 : // returned by the iterator are slices in Batch.data. Thus the
1467 2 : // fragmented key spans are slices within Batch.data. If additional
1468 2 : // entries are added to the Batch, Batch.data may be reallocated. The
1469 2 : // references in the fragmented keys will remain valid, pointing into
1470 2 : // the old Batch.data. GC for the win.
1471 2 :
1472 2 : // Use a single []keyspan.Key buffer to avoid allocating many
1473 2 : // individual []keyspan.Key slices with a single element each.
1474 2 : keyBuf := make([]keyspan.Key, 0, count)
1475 2 : for kv := it.First(); kv != nil; kv = it.Next() {
1476 2 : s, err := rangekey.Decode(kv.K, kv.InPlaceValue(), keyBuf)
1477 2 : if err != nil {
1478 0 : return err
1479 0 : }
1480 2 : keyBuf = s.Keys[len(s.Keys):]
1481 2 :
1482 2 : // Set a fixed capacity to avoid accidental overwriting.
1483 2 : s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
1484 2 : frag.Add(s)
1485 : }
1486 2 : frag.Finish()
1487 2 : return nil
1488 : }
1489 :
1490 : // Commit applies the batch to its parent writer.
1491 2 : func (b *Batch) Commit(o *WriteOptions) error {
1492 2 : return b.db.Apply(b, o)
1493 2 : }
1494 :
1495 : // Close closes the batch without committing it.
1496 2 : func (b *Batch) Close() error {
1497 2 : // The storage engine commit pipeline may retain a pointer to b.data beyond
1498 2 : // when Commit() returns. This is possible when configured for WAL failover;
1499 2 : // we don't know if we might need to read the batch data again until the
1500 2 : // batch has been durably synced [even if the committer doesn't care to wait
1501 2 : // for the sync and Sync()=false].
1502 2 : //
1503 2 : // We still want to recycle these batches. The b.lifecycle atomic negotiates
1504 2 : // the batch's lifecycle. If the commit pipeline still might read b.data,
1505 2 : // b.lifecycle will be nonzeroed [the low bits hold a ref count].
1506 2 : for {
1507 2 : v := b.lifecycle.Load()
1508 2 : switch {
1509 2 : case v == 0:
1510 2 : // A zero value indicates that the commit pipeline has no
1511 2 : // outstanding references to the batch. The commit pipeline is
1512 2 : // required to acquire a ref synchronously, so there is no risk that
1513 2 : // the commit pipeline will grab a ref after the call to release. We
1514 2 : // can simply release the batch.
1515 2 : b.release()
1516 2 : return nil
1517 1 : case (v & batchClosedBit) != 0:
1518 1 : // The batch has a batchClosedBit: This batch has already been closed.
1519 1 : return ErrClosed
1520 2 : default:
1521 2 : // There's an outstanding reference. Set the batch released bit so
1522 2 : // that the commit pipeline knows it should release the batch when
1523 2 : // it unrefs.
1524 2 : if b.lifecycle.CompareAndSwap(v, v|batchClosedBit) {
1525 2 : return nil
1526 2 : }
1527 : // CAS Failed—this indicates the outstanding reference just
1528 : // decremented (or the caller illegally closed the batch twice).
1529 : // Loop to reload.
1530 : }
1531 : }
1532 : }
1533 :
1534 : // Indexed returns true if the batch is indexed (i.e. supports read
1535 : // operations).
1536 2 : func (b *Batch) Indexed() bool {
1537 2 : return b.index != nil
1538 2 : }
1539 :
1540 : // init ensures that the batch data slice is initialized to meet the
1541 : // minimum required size and allocates space for the batch header.
1542 2 : func (b *Batch) init(size int) {
1543 2 : b.opts.ensureDefaults()
1544 2 : n := b.opts.initialSizeBytes
1545 2 : for n < size {
1546 1 : n *= 2
1547 1 : }
1548 2 : if cap(b.data) < n {
1549 2 : b.data = rawalloc.New(batchrepr.HeaderLen, n)
1550 2 : }
1551 2 : b.data = b.data[:batchrepr.HeaderLen]
1552 2 : clear(b.data) // Zero the sequence number in the header
1553 : }
1554 :
1555 : // Reset resets the batch for reuse. The underlying byte slice (that is
1556 : // returned by Repr()) may not be modified. It is only necessary to call this
1557 : // method if a batch is explicitly being reused. Close automatically takes are
1558 : // of releasing resources when appropriate for batches that are internally
1559 : // being reused.
1560 1 : func (b *Batch) Reset() {
1561 1 : // In some configurations (WAL failover) the commit pipeline may retain
1562 1 : // b.data beyond a call to commit the batch. When this happens, b.lifecycle
1563 1 : // is nonzero (see the comment above b.lifecycle). In this case it's unsafe
1564 1 : // to mutate b.data, so we discard it. Note that Reset must not be called on
1565 1 : // a closed batch, so v > 0 implies a non-zero ref count and not
1566 1 : // batchClosedBit being set.
1567 1 : if v := b.lifecycle.Load(); v > 0 {
1568 1 : b.data = nil
1569 1 : }
1570 1 : b.reset()
1571 : }
1572 :
1573 2 : func (b *Batch) reset() {
1574 2 : // Zero out the struct, retaining only the fields necessary for manual
1575 2 : // reuse.
1576 2 : b.batchInternal = batchInternal{
1577 2 : data: b.data,
1578 2 : comparer: b.comparer,
1579 2 : opts: b.opts,
1580 2 : index: b.index,
1581 2 : db: b.db,
1582 2 : }
1583 2 : b.applied.Store(false)
1584 2 : if b.data != nil {
1585 2 : if cap(b.data) > b.opts.maxRetainedSizeBytes {
1586 1 : // If the capacity of the buffer is larger than our maximum
1587 1 : // retention size, don't re-use it. Let it be GC-ed instead.
1588 1 : // This prevents the memory from an unusually large batch from
1589 1 : // being held on to indefinitely.
1590 1 : b.data = nil
1591 2 : } else {
1592 2 : // Otherwise, reset the buffer for re-use.
1593 2 : b.data = b.data[:batchrepr.HeaderLen]
1594 2 : clear(b.data)
1595 2 : }
1596 : }
1597 2 : if b.index != nil {
1598 2 : b.index.Init(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
1599 2 : }
1600 : }
1601 :
1602 2 : func (b *Batch) grow(n int) {
1603 2 : newSize := len(b.data) + n
1604 2 : if uint64(newSize) >= maxBatchSize {
1605 1 : panic(ErrBatchTooLarge)
1606 : }
1607 2 : if newSize > cap(b.data) {
1608 1 : newCap := 2 * cap(b.data)
1609 1 : for newCap < newSize {
1610 1 : newCap *= 2
1611 1 : }
1612 1 : newData := rawalloc.New(len(b.data), newCap)
1613 1 : copy(newData, b.data)
1614 1 : b.data = newData
1615 : }
1616 2 : b.data = b.data[:newSize]
1617 : }
1618 :
1619 2 : func (b *Batch) setSeqNum(seqNum base.SeqNum) {
1620 2 : batchrepr.SetSeqNum(b.data, seqNum)
1621 2 : }
1622 :
1623 : // SeqNum returns the batch sequence number which is applied to the first
1624 : // record in the batch. The sequence number is incremented for each subsequent
1625 : // record. It returns zero if the batch is empty.
1626 2 : func (b *Batch) SeqNum() base.SeqNum {
1627 2 : if len(b.data) == 0 {
1628 1 : b.init(batchrepr.HeaderLen)
1629 1 : }
1630 2 : return batchrepr.ReadSeqNum(b.data)
1631 : }
1632 :
1633 2 : func (b *Batch) setCount(v uint32) {
1634 2 : b.count = uint64(v)
1635 2 : }
1636 :
1637 : // Count returns the count of memtable-modifying operations in this batch. All
1638 : // operations with the except of LogData increment this count. For IngestSSTs,
1639 : // count is only used to indicate the number of SSTs ingested in the record, the
1640 : // batch isn't applied to the memtable.
1641 2 : func (b *Batch) Count() uint32 {
1642 2 : if b.count > math.MaxUint32 {
1643 1 : panic(batchrepr.ErrInvalidBatch)
1644 : }
1645 2 : return uint32(b.count)
1646 : }
1647 :
1648 : // Reader returns a batchrepr.Reader for the current batch contents. If the
1649 : // batch is mutated, the new entries will not be visible to the reader.
1650 2 : func (b *Batch) Reader() batchrepr.Reader {
1651 2 : if len(b.data) == 0 {
1652 2 : b.init(batchrepr.HeaderLen)
1653 2 : }
1654 2 : return batchrepr.Read(b.data)
1655 : }
1656 :
1657 : // SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
1658 2 : func (b *Batch) SyncWait() error {
1659 2 : now := crtime.NowMono()
1660 2 : b.fsyncWait.Wait()
1661 2 : if b.commitErr != nil {
1662 0 : b.db = nil // prevent batch reuse on error
1663 0 : }
1664 2 : waitDuration := now.Elapsed()
1665 2 : b.commitStats.CommitWaitDuration += waitDuration
1666 2 : b.commitStats.TotalDuration += waitDuration
1667 2 : return b.commitErr
1668 : }
1669 :
1670 : // CommitStats returns stats related to committing the batch. Should be called
1671 : // after Batch.Commit, DB.Apply. If DB.ApplyNoSyncWait is used, should be
1672 : // called after Batch.SyncWait.
1673 1 : func (b *Batch) CommitStats() BatchCommitStats {
1674 1 : return b.commitStats
1675 1 : }
1676 :
1677 : // Note: batchIter mirrors the implementation of flushableBatchIter. Keep the
1678 : // two in sync.
1679 : type batchIter struct {
1680 : batch *Batch
1681 : iter batchskl.Iterator
1682 : kv base.InternalKV
1683 : err error
1684 : // snapshot holds a batch "sequence number" at which the batch is being
1685 : // read. This sequence number has the InternalKeySeqNumBatch bit set, so it
1686 : // encodes an offset within the batch. Only batch entries earlier than the
1687 : // offset are visible during iteration.
1688 : snapshot base.SeqNum
1689 : }
1690 :
1691 : // batchIter implements the base.InternalIterator interface.
1692 : var _ base.InternalIterator = (*batchIter)(nil)
1693 :
1694 0 : func (i *batchIter) String() string {
1695 0 : return "batch"
1696 0 : }
1697 :
1698 2 : func (i *batchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
1699 2 : // Ignore TrySeekUsingNext if the view of the batch changed.
1700 2 : if flags.TrySeekUsingNext() && flags.BatchJustRefreshed() {
1701 1 : flags = flags.DisableTrySeekUsingNext()
1702 1 : }
1703 :
1704 2 : i.err = nil // clear cached iteration error
1705 2 : ikey := i.iter.SeekGE(key, flags)
1706 2 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1707 0 : ikey = i.iter.Next()
1708 0 : }
1709 2 : if ikey == nil {
1710 2 : i.kv = base.InternalKV{}
1711 2 : return nil
1712 2 : }
1713 2 : i.kv.K = *ikey
1714 2 : i.kv.V = base.MakeInPlaceValue(i.value())
1715 2 : return &i.kv
1716 : }
1717 :
1718 2 : func (i *batchIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
1719 2 : kv := i.SeekGE(key, flags)
1720 2 : if kv == nil {
1721 2 : return nil
1722 2 : }
1723 : // If the key doesn't have the sought prefix, return nil.
1724 2 : if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
1725 2 : i.kv = base.InternalKV{}
1726 2 : return nil
1727 2 : }
1728 2 : return kv
1729 : }
1730 :
1731 2 : func (i *batchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
1732 2 : i.err = nil // clear cached iteration error
1733 2 : ikey := i.iter.SeekLT(key)
1734 2 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1735 0 : ikey = i.iter.Prev()
1736 0 : }
1737 2 : if ikey == nil {
1738 2 : i.kv = base.InternalKV{}
1739 2 : return nil
1740 2 : }
1741 2 : i.kv.K = *ikey
1742 2 : i.kv.V = base.MakeInPlaceValue(i.value())
1743 2 : return &i.kv
1744 : }
1745 :
1746 2 : func (i *batchIter) First() *base.InternalKV {
1747 2 : i.err = nil // clear cached iteration error
1748 2 : ikey := i.iter.First()
1749 2 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1750 1 : ikey = i.iter.Next()
1751 1 : }
1752 2 : if ikey == nil {
1753 2 : i.kv = base.InternalKV{}
1754 2 : return nil
1755 2 : }
1756 2 : i.kv.K = *ikey
1757 2 : i.kv.V = base.MakeInPlaceValue(i.value())
1758 2 : return &i.kv
1759 : }
1760 :
1761 2 : func (i *batchIter) Last() *base.InternalKV {
1762 2 : i.err = nil // clear cached iteration error
1763 2 : ikey := i.iter.Last()
1764 2 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1765 0 : ikey = i.iter.Prev()
1766 0 : }
1767 2 : if ikey == nil {
1768 1 : i.kv = base.InternalKV{}
1769 1 : return nil
1770 1 : }
1771 1 : i.kv.K = *ikey
1772 1 : i.kv.V = base.MakeInPlaceValue(i.value())
1773 1 : return &i.kv
1774 : }
1775 :
1776 2 : func (i *batchIter) Next() *base.InternalKV {
1777 2 : ikey := i.iter.Next()
1778 2 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1779 1 : ikey = i.iter.Next()
1780 1 : }
1781 2 : if ikey == nil {
1782 2 : i.kv = base.InternalKV{}
1783 2 : return nil
1784 2 : }
1785 2 : i.kv.K = *ikey
1786 2 : i.kv.V = base.MakeInPlaceValue(i.value())
1787 2 : return &i.kv
1788 : }
1789 :
1790 0 : func (i *batchIter) NextPrefix(succKey []byte) *base.InternalKV {
1791 0 : // Because NextPrefix was invoked `succKey` must be ≥ the key at i's current
1792 0 : // position. Seek the arena iterator using TrySeekUsingNext.
1793 0 : ikey := i.iter.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
1794 0 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1795 0 : ikey = i.iter.Next()
1796 0 : }
1797 0 : if ikey == nil {
1798 0 : i.kv = base.InternalKV{}
1799 0 : return nil
1800 0 : }
1801 0 : i.kv.K = *ikey
1802 0 : i.kv.V = base.MakeInPlaceValue(i.value())
1803 0 : return &i.kv
1804 : }
1805 :
1806 2 : func (i *batchIter) Prev() *base.InternalKV {
1807 2 : ikey := i.iter.Prev()
1808 2 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1809 0 : ikey = i.iter.Prev()
1810 0 : }
1811 2 : if ikey == nil {
1812 2 : i.kv = base.InternalKV{}
1813 2 : return nil
1814 2 : }
1815 2 : i.kv.K = *ikey
1816 2 : i.kv.V = base.MakeInPlaceValue(i.value())
1817 2 : return &i.kv
1818 : }
1819 :
1820 2 : func (i *batchIter) value() []byte {
1821 2 : offset, _, keyEnd := i.iter.KeyInfo()
1822 2 : data := i.batch.data
1823 2 : if len(data[offset:]) == 0 {
1824 0 : i.err = base.CorruptionErrorf("corrupted batch")
1825 0 : return nil
1826 0 : }
1827 :
1828 2 : switch InternalKeyKind(data[offset]) {
1829 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
1830 : InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
1831 2 : InternalKeyKindDeleteSized:
1832 2 : _, value, ok := batchrepr.DecodeStr(data[keyEnd:])
1833 2 : if !ok {
1834 0 : return nil
1835 0 : }
1836 2 : return value
1837 2 : default:
1838 2 : return nil
1839 : }
1840 : }
1841 :
1842 2 : func (i *batchIter) Error() error {
1843 2 : return i.err
1844 2 : }
1845 :
1846 2 : func (i *batchIter) Close() error {
1847 2 : _ = i.iter.Close()
1848 2 : return i.err
1849 2 : }
1850 :
1851 2 : func (i *batchIter) SetBounds(lower, upper []byte) {
1852 2 : i.iter.SetBounds(lower, upper)
1853 2 : }
1854 :
1855 0 : func (i *batchIter) SetContext(_ context.Context) {}
1856 :
1857 : // DebugTree is part of the InternalIterator interface.
1858 0 : func (i *batchIter) DebugTree(tp treeprinter.Node) {
1859 0 : tp.Childf("%T(%p)", i, i)
1860 0 : }
1861 :
1862 : type flushableBatchEntry struct {
1863 : // offset is the byte offset of the record within the batch repr.
1864 : offset uint32
1865 : // index is the 0-based ordinal number of the record within the batch. Used
1866 : // to compute the seqnum for the record.
1867 : index uint32
1868 : // key{Start,End} are the start and end byte offsets of the key within the
1869 : // batch repr. Cached to avoid decoding the key length on every
1870 : // comparison. The value is stored starting at keyEnd.
1871 : keyStart uint32
1872 : keyEnd uint32
1873 : }
1874 :
1875 : // flushableBatch wraps an existing batch and provides the interfaces needed
1876 : // for making the batch flushable (i.e. able to mimic a memtable).
1877 : type flushableBatch struct {
1878 : cmp Compare
1879 : comparer *base.Comparer
1880 : data []byte
1881 :
1882 : // The base sequence number for the entries in the batch. This is the same
1883 : // value as Batch.seqNum() and is cached here for performance.
1884 : seqNum base.SeqNum
1885 :
1886 : // A slice of offsets and indices for the entries in the batch. Used to
1887 : // implement flushableBatchIter. Unlike the indexing on a normal batch, a
1888 : // flushable batch is indexed such that batch entry i will be given the
1889 : // sequence number flushableBatch.seqNum+i.
1890 : //
1891 : // Sorted in increasing order of key and decreasing order of offset (since
1892 : // higher offsets correspond to higher sequence numbers).
1893 : //
1894 : // Does not include range deletion entries or range key entries.
1895 : offsets []flushableBatchEntry
1896 :
1897 : // Fragmented range deletion tombstones.
1898 : tombstones []keyspan.Span
1899 :
1900 : // Fragmented range keys.
1901 : rangeKeys []keyspan.Span
1902 : }
1903 :
1904 : var _ flushable = (*flushableBatch)(nil)
1905 :
1906 : // newFlushableBatch creates a new batch that implements the flushable
1907 : // interface. This allows the batch to act like a memtable and be placed in the
1908 : // queue of flushable memtables. Note that the flushable batch takes ownership
1909 : // of the batch data.
1910 2 : func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error) {
1911 2 : b := &flushableBatch{
1912 2 : data: batch.data,
1913 2 : cmp: comparer.Compare,
1914 2 : comparer: comparer,
1915 2 : offsets: make([]flushableBatchEntry, 0, batch.Count()),
1916 2 : }
1917 2 : if b.data != nil {
1918 2 : // Note that this sequence number is not correct when this batch has not
1919 2 : // been applied since the sequence number has not been assigned yet. The
1920 2 : // correct sequence number will be set later. But it is correct when the
1921 2 : // batch is being replayed from the WAL.
1922 2 : b.seqNum = batch.SeqNum()
1923 2 : }
1924 2 : var rangeDelOffsets []flushableBatchEntry
1925 2 : var rangeKeyOffsets []flushableBatchEntry
1926 2 : if len(b.data) > batchrepr.HeaderLen {
1927 2 : // Non-empty batch.
1928 2 : var index uint32
1929 2 : for iter := batchrepr.Read(b.data); len(iter) > 0; {
1930 2 : offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
1931 2 : kind, key, _, ok, err := iter.Next()
1932 2 : if !ok {
1933 0 : if err != nil {
1934 0 : return nil, err
1935 0 : }
1936 0 : break
1937 : }
1938 2 : entry := flushableBatchEntry{
1939 2 : offset: uint32(offset),
1940 2 : index: uint32(index),
1941 2 : }
1942 2 : if keySize := uint32(len(key)); keySize == 0 {
1943 2 : // Must add 2 to the offset. One byte encodes `kind` and the next
1944 2 : // byte encodes `0`, which is the length of the key.
1945 2 : entry.keyStart = uint32(offset) + 2
1946 2 : entry.keyEnd = entry.keyStart
1947 2 : } else {
1948 2 : entry.keyStart = uint32(uintptr(unsafe.Pointer(&key[0])) -
1949 2 : uintptr(unsafe.Pointer(&b.data[0])))
1950 2 : entry.keyEnd = entry.keyStart + keySize
1951 2 : }
1952 2 : switch kind {
1953 2 : case InternalKeyKindRangeDelete:
1954 2 : rangeDelOffsets = append(rangeDelOffsets, entry)
1955 2 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
1956 2 : rangeKeyOffsets = append(rangeKeyOffsets, entry)
1957 2 : case InternalKeyKindLogData:
1958 2 : // Skip it; we never want to iterate over LogDatas.
1959 2 : continue
1960 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge,
1961 2 : InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
1962 2 : b.offsets = append(b.offsets, entry)
1963 0 : default:
1964 0 : // Note In some circumstances this might be temporary memory
1965 0 : // corruption that can be recovered by discarding the batch and
1966 0 : // trying again. In other cases, the batch repr might've been
1967 0 : // already persisted elsewhere, and we'll loop continuously trying
1968 0 : // to commit the same corrupted batch. The caller is responsible for
1969 0 : // distinguishing.
1970 0 : return nil, errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
1971 : }
1972 : // NB: index (used for entry.offset above) must not reach the
1973 : // batch.count, because the offset is used in conjunction with the
1974 : // batch's sequence number to assign sequence numbers to keys within
1975 : // the batch. If we assign KV's indexes as high as batch.count,
1976 : // we'll begin assigning keys sequence numbers that weren't
1977 : // allocated.
1978 2 : if index >= uint32(batch.count) {
1979 0 : return nil, base.AssertionFailedf("pebble: batch entry index %d ≥ batch.count %d", index, batch.count)
1980 0 : }
1981 2 : index++
1982 : }
1983 : }
1984 :
1985 : // Sort all of offsets, rangeDelOffsets and rangeKeyOffsets, using *batch's
1986 : // sort.Interface implementation.
1987 2 : pointOffsets := b.offsets
1988 2 : sort.Sort(b)
1989 2 : b.offsets = rangeDelOffsets
1990 2 : sort.Sort(b)
1991 2 : b.offsets = rangeKeyOffsets
1992 2 : sort.Sort(b)
1993 2 : b.offsets = pointOffsets
1994 2 :
1995 2 : if len(rangeDelOffsets) > 0 {
1996 2 : frag := &keyspan.Fragmenter{
1997 2 : Cmp: b.cmp,
1998 2 : Format: b.comparer.FormatKey,
1999 2 : Emit: func(s keyspan.Span) {
2000 2 : b.tombstones = append(b.tombstones, s)
2001 2 : },
2002 : }
2003 2 : it := &flushableBatchIter{
2004 2 : batch: b,
2005 2 : data: b.data,
2006 2 : offsets: rangeDelOffsets,
2007 2 : cmp: b.cmp,
2008 2 : index: -1,
2009 2 : }
2010 2 : fragmentRangeDels(frag, it, len(rangeDelOffsets))
2011 : }
2012 2 : if len(rangeKeyOffsets) > 0 {
2013 2 : frag := &keyspan.Fragmenter{
2014 2 : Cmp: b.cmp,
2015 2 : Format: b.comparer.FormatKey,
2016 2 : Emit: func(s keyspan.Span) {
2017 2 : b.rangeKeys = append(b.rangeKeys, s)
2018 2 : },
2019 : }
2020 2 : it := &flushableBatchIter{
2021 2 : batch: b,
2022 2 : data: b.data,
2023 2 : offsets: rangeKeyOffsets,
2024 2 : cmp: b.cmp,
2025 2 : index: -1,
2026 2 : }
2027 2 : fragmentRangeKeys(frag, it, len(rangeKeyOffsets))
2028 : }
2029 2 : return b, nil
2030 : }
2031 :
2032 2 : func (b *flushableBatch) setSeqNum(seqNum base.SeqNum) {
2033 2 : if b.seqNum != 0 {
2034 0 : panic(fmt.Sprintf("pebble: flushableBatch.seqNum already set: %d", b.seqNum))
2035 : }
2036 2 : b.seqNum = seqNum
2037 2 : for i := range b.tombstones {
2038 2 : for j := range b.tombstones[i].Keys {
2039 2 : b.tombstones[i].Keys[j].Trailer = base.MakeTrailer(
2040 2 : b.tombstones[i].Keys[j].SeqNum()+seqNum,
2041 2 : b.tombstones[i].Keys[j].Kind(),
2042 2 : )
2043 2 : }
2044 : }
2045 2 : for i := range b.rangeKeys {
2046 2 : for j := range b.rangeKeys[i].Keys {
2047 2 : b.rangeKeys[i].Keys[j].Trailer = base.MakeTrailer(
2048 2 : b.rangeKeys[i].Keys[j].SeqNum()+seqNum,
2049 2 : b.rangeKeys[i].Keys[j].Kind(),
2050 2 : )
2051 2 : }
2052 : }
2053 : }
2054 :
2055 2 : func (b *flushableBatch) Len() int {
2056 2 : return len(b.offsets)
2057 2 : }
2058 :
2059 2 : func (b *flushableBatch) Less(i, j int) bool {
2060 2 : ei := &b.offsets[i]
2061 2 : ej := &b.offsets[j]
2062 2 : ki := b.data[ei.keyStart:ei.keyEnd]
2063 2 : kj := b.data[ej.keyStart:ej.keyEnd]
2064 2 : switch c := b.cmp(ki, kj); {
2065 2 : case c < 0:
2066 2 : return true
2067 2 : case c > 0:
2068 2 : return false
2069 2 : default:
2070 2 : return ei.offset > ej.offset
2071 : }
2072 : }
2073 :
2074 2 : func (b *flushableBatch) Swap(i, j int) {
2075 2 : b.offsets[i], b.offsets[j] = b.offsets[j], b.offsets[i]
2076 2 : }
2077 :
2078 : // newIter is part of the flushable interface.
2079 2 : func (b *flushableBatch) newIter(o *IterOptions) internalIterator {
2080 2 : return &flushableBatchIter{
2081 2 : batch: b,
2082 2 : data: b.data,
2083 2 : offsets: b.offsets,
2084 2 : cmp: b.cmp,
2085 2 : index: -1,
2086 2 : lower: o.GetLowerBound(),
2087 2 : upper: o.GetUpperBound(),
2088 2 : }
2089 2 : }
2090 :
2091 : // newFlushIter is part of the flushable interface.
2092 2 : func (b *flushableBatch) newFlushIter(o *IterOptions) internalIterator {
2093 2 : return &flushFlushableBatchIter{
2094 2 : flushableBatchIter: flushableBatchIter{
2095 2 : batch: b,
2096 2 : data: b.data,
2097 2 : offsets: b.offsets,
2098 2 : cmp: b.cmp,
2099 2 : index: -1,
2100 2 : },
2101 2 : }
2102 2 : }
2103 :
2104 : // newRangeDelIter is part of the flushable interface.
2105 2 : func (b *flushableBatch) newRangeDelIter(o *IterOptions) keyspan.FragmentIterator {
2106 2 : if len(b.tombstones) == 0 {
2107 2 : return nil
2108 2 : }
2109 2 : return keyspan.NewIter(b.cmp, b.tombstones)
2110 : }
2111 :
2112 : // newRangeKeyIter is part of the flushable interface.
2113 2 : func (b *flushableBatch) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
2114 2 : if len(b.rangeKeys) == 0 {
2115 2 : return nil
2116 2 : }
2117 2 : return keyspan.NewIter(b.cmp, b.rangeKeys)
2118 : }
2119 :
2120 : // containsRangeKeys is part of the flushable interface.
2121 2 : func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0 }
2122 :
2123 : // inuseBytes is part of the flushable interface.
2124 2 : func (b *flushableBatch) inuseBytes() uint64 {
2125 2 : return uint64(len(b.data) - batchrepr.HeaderLen)
2126 2 : }
2127 :
2128 : // totalBytes is part of the flushable interface.
2129 2 : func (b *flushableBatch) totalBytes() uint64 {
2130 2 : return uint64(cap(b.data))
2131 2 : }
2132 :
2133 : // readyForFlush is part of the flushable interface.
2134 2 : func (b *flushableBatch) readyForFlush() bool {
2135 2 : // A flushable batch is always ready for flush; it must be flushed together
2136 2 : // with the previous memtable.
2137 2 : return true
2138 2 : }
2139 :
2140 : // computePossibleOverlaps is part of the flushable interface.
2141 : func (b *flushableBatch) computePossibleOverlaps(
2142 : fn func(bounded) shouldContinue, bounded ...bounded,
2143 2 : ) {
2144 2 : computePossibleOverlapsGenericImpl[*flushableBatch](b, b.cmp, fn, bounded)
2145 2 : }
2146 :
2147 : // Note: flushableBatchIter mirrors the implementation of batchIter. Keep the
2148 : // two in sync.
2149 : type flushableBatchIter struct {
2150 : // Members to be initialized by creator.
2151 : batch *flushableBatch
2152 : // The bytes backing the batch. Always the same as batch.data?
2153 : data []byte
2154 : // The sorted entries. This is not always equal to batch.offsets.
2155 : offsets []flushableBatchEntry
2156 : cmp Compare
2157 : // Must be initialized to -1. It is the index into offsets that represents
2158 : // the current iterator position.
2159 : index int
2160 :
2161 : // For internal use by the implementation.
2162 : kv base.InternalKV
2163 : err error
2164 :
2165 : // Optionally initialize to bounds of iteration, if any.
2166 : lower []byte
2167 : upper []byte
2168 : }
2169 :
2170 : // flushableBatchIter implements the base.InternalIterator interface.
2171 : var _ base.InternalIterator = (*flushableBatchIter)(nil)
2172 :
2173 0 : func (i *flushableBatchIter) String() string {
2174 0 : return "flushable-batch"
2175 0 : }
2176 :
2177 : // SeekGE implements internalIterator.SeekGE, as documented in the pebble
2178 : // package. Ignore flags.TrySeekUsingNext() since we don't expect this
2179 : // optimization to provide much benefit here at the moment.
2180 2 : func (i *flushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
2181 2 : i.err = nil // clear cached iteration error
2182 2 : ikey := base.MakeSearchKey(key)
2183 2 : i.index = sort.Search(len(i.offsets), func(j int) bool {
2184 2 : return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
2185 2 : })
2186 2 : if i.index >= len(i.offsets) {
2187 2 : return nil
2188 2 : }
2189 2 : kv := i.getKV(i.index)
2190 2 : if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
2191 2 : i.index = len(i.offsets)
2192 2 : return nil
2193 2 : }
2194 2 : return kv
2195 : }
2196 :
2197 : // SeekPrefixGE implements internalIterator.SeekPrefixGE, as documented in the
2198 : // pebble package.
2199 : func (i *flushableBatchIter) SeekPrefixGE(
2200 : prefix, key []byte, flags base.SeekGEFlags,
2201 2 : ) *base.InternalKV {
2202 2 : kv := i.SeekGE(key, flags)
2203 2 : if kv == nil {
2204 2 : return nil
2205 2 : }
2206 : // If the key doesn't have the sought prefix, return nil.
2207 2 : if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
2208 2 : return nil
2209 2 : }
2210 2 : return kv
2211 : }
2212 :
2213 : // SeekLT implements internalIterator.SeekLT, as documented in the pebble
2214 : // package.
2215 2 : func (i *flushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
2216 2 : i.err = nil // clear cached iteration error
2217 2 : ikey := base.MakeSearchKey(key)
2218 2 : i.index = sort.Search(len(i.offsets), func(j int) bool {
2219 2 : return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
2220 2 : })
2221 2 : i.index--
2222 2 : if i.index < 0 {
2223 2 : return nil
2224 2 : }
2225 2 : kv := i.getKV(i.index)
2226 2 : if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
2227 2 : i.index = -1
2228 2 : return nil
2229 2 : }
2230 2 : return kv
2231 : }
2232 :
2233 : // First implements internalIterator.First, as documented in the pebble
2234 : // package.
2235 2 : func (i *flushableBatchIter) First() *base.InternalKV {
2236 2 : i.err = nil // clear cached iteration error
2237 2 : if len(i.offsets) == 0 {
2238 2 : return nil
2239 2 : }
2240 2 : i.index = 0
2241 2 : kv := i.getKV(i.index)
2242 2 : if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
2243 2 : i.index = len(i.offsets)
2244 2 : return nil
2245 2 : }
2246 2 : return kv
2247 : }
2248 :
2249 : // Last implements internalIterator.Last, as documented in the pebble
2250 : // package.
2251 2 : func (i *flushableBatchIter) Last() *base.InternalKV {
2252 2 : i.err = nil // clear cached iteration error
2253 2 : if len(i.offsets) == 0 {
2254 1 : return nil
2255 1 : }
2256 2 : i.index = len(i.offsets) - 1
2257 2 : kv := i.getKV(i.index)
2258 2 : if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
2259 1 : i.index = -1
2260 1 : return nil
2261 1 : }
2262 2 : return kv
2263 : }
2264 :
2265 : // Note: flushFlushableBatchIter.Next mirrors the implementation of
2266 : // flushableBatchIter.Next due to performance. Keep the two in sync.
2267 2 : func (i *flushableBatchIter) Next() *base.InternalKV {
2268 2 : if i.index == len(i.offsets) {
2269 0 : return nil
2270 0 : }
2271 2 : i.index++
2272 2 : if i.index == len(i.offsets) {
2273 2 : return nil
2274 2 : }
2275 2 : kv := i.getKV(i.index)
2276 2 : if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
2277 2 : i.index = len(i.offsets)
2278 2 : return nil
2279 2 : }
2280 2 : return kv
2281 : }
2282 :
2283 2 : func (i *flushableBatchIter) Prev() *base.InternalKV {
2284 2 : if i.index < 0 {
2285 0 : return nil
2286 0 : }
2287 2 : i.index--
2288 2 : if i.index < 0 {
2289 2 : return nil
2290 2 : }
2291 2 : kv := i.getKV(i.index)
2292 2 : if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
2293 2 : i.index = -1
2294 2 : return nil
2295 2 : }
2296 2 : return kv
2297 : }
2298 :
2299 : // Note: flushFlushableBatchIter.NextPrefix mirrors the implementation of
2300 : // flushableBatchIter.NextPrefix due to performance. Keep the two in sync.
2301 0 : func (i *flushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV {
2302 0 : return i.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
2303 0 : }
2304 :
2305 2 : func (i *flushableBatchIter) getKey(index int) InternalKey {
2306 2 : e := &i.offsets[index]
2307 2 : kind := InternalKeyKind(i.data[e.offset])
2308 2 : key := i.data[e.keyStart:e.keyEnd]
2309 2 : return base.MakeInternalKey(key, i.batch.seqNum+base.SeqNum(e.index), kind)
2310 2 : }
2311 :
2312 2 : func (i *flushableBatchIter) getKV(index int) *base.InternalKV {
2313 2 : i.kv = base.InternalKV{
2314 2 : K: i.getKey(index),
2315 2 : V: i.extractValue(),
2316 2 : }
2317 2 : return &i.kv
2318 2 : }
2319 :
2320 2 : func (i *flushableBatchIter) extractValue() base.LazyValue {
2321 2 : p := i.data[i.offsets[i.index].offset:]
2322 2 : if len(p) == 0 {
2323 0 : i.err = base.CorruptionErrorf("corrupted batch")
2324 0 : return base.LazyValue{}
2325 0 : }
2326 2 : kind := InternalKeyKind(p[0])
2327 2 : if kind > InternalKeyKindMax {
2328 0 : i.err = base.CorruptionErrorf("corrupted batch")
2329 0 : return base.LazyValue{}
2330 0 : }
2331 2 : var value []byte
2332 2 : var ok bool
2333 2 : switch kind {
2334 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
2335 : InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
2336 2 : InternalKeyKindDeleteSized:
2337 2 : keyEnd := i.offsets[i.index].keyEnd
2338 2 : _, value, ok = batchrepr.DecodeStr(i.data[keyEnd:])
2339 2 : if !ok {
2340 0 : i.err = base.CorruptionErrorf("corrupted batch")
2341 0 : return base.LazyValue{}
2342 0 : }
2343 : }
2344 2 : return base.MakeInPlaceValue(value)
2345 : }
2346 :
2347 0 : func (i *flushableBatchIter) Valid() bool {
2348 0 : return i.index >= 0 && i.index < len(i.offsets)
2349 0 : }
2350 :
2351 2 : func (i *flushableBatchIter) Error() error {
2352 2 : return i.err
2353 2 : }
2354 :
2355 2 : func (i *flushableBatchIter) Close() error {
2356 2 : return i.err
2357 2 : }
2358 :
2359 2 : func (i *flushableBatchIter) SetBounds(lower, upper []byte) {
2360 2 : i.lower = lower
2361 2 : i.upper = upper
2362 2 : }
2363 :
2364 0 : func (i *flushableBatchIter) SetContext(_ context.Context) {}
2365 :
2366 : // DebugTree is part of the InternalIterator interface.
2367 0 : func (i *flushableBatchIter) DebugTree(tp treeprinter.Node) {
2368 0 : tp.Childf("%T(%p)", i, i)
2369 0 : }
2370 :
2371 : // flushFlushableBatchIter is similar to flushableBatchIter but it keeps track
2372 : // of number of bytes iterated.
2373 : type flushFlushableBatchIter struct {
2374 : flushableBatchIter
2375 : }
2376 :
2377 : // flushFlushableBatchIter implements the base.InternalIterator interface.
2378 : var _ base.InternalIterator = (*flushFlushableBatchIter)(nil)
2379 :
2380 0 : func (i *flushFlushableBatchIter) String() string {
2381 0 : return "flushable-batch"
2382 0 : }
2383 :
2384 0 : func (i *flushFlushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
2385 0 : panic("pebble: SeekGE unimplemented")
2386 : }
2387 :
2388 : func (i *flushFlushableBatchIter) SeekPrefixGE(
2389 : prefix, key []byte, flags base.SeekGEFlags,
2390 0 : ) *base.InternalKV {
2391 0 : panic("pebble: SeekPrefixGE unimplemented")
2392 : }
2393 :
2394 0 : func (i *flushFlushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
2395 0 : panic("pebble: SeekLT unimplemented")
2396 : }
2397 :
2398 2 : func (i *flushFlushableBatchIter) First() *base.InternalKV {
2399 2 : i.err = nil // clear cached iteration error
2400 2 : return i.flushableBatchIter.First()
2401 2 : }
2402 :
2403 0 : func (i *flushFlushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV {
2404 0 : panic("pebble: Prev unimplemented")
2405 : }
2406 :
2407 : // Note: flushFlushableBatchIter.Next mirrors the implementation of
2408 : // flushableBatchIter.Next due to performance. Keep the two in sync.
2409 2 : func (i *flushFlushableBatchIter) Next() *base.InternalKV {
2410 2 : if i.index == len(i.offsets) {
2411 0 : return nil
2412 0 : }
2413 2 : i.index++
2414 2 : if i.index == len(i.offsets) {
2415 2 : return nil
2416 2 : }
2417 2 : return i.getKV(i.index)
2418 : }
2419 :
2420 0 : func (i flushFlushableBatchIter) Prev() *base.InternalKV {
2421 0 : panic("pebble: Prev unimplemented")
2422 : }
2423 :
2424 : // batchOptions holds the parameters to configure batch.
2425 : type batchOptions struct {
2426 : initialSizeBytes int
2427 : maxRetainedSizeBytes int
2428 : }
2429 :
2430 : // ensureDefaults creates batch options with default values.
2431 2 : func (o *batchOptions) ensureDefaults() {
2432 2 : if o.initialSizeBytes <= 0 {
2433 2 : o.initialSizeBytes = defaultBatchInitialSize
2434 2 : }
2435 2 : if o.maxRetainedSizeBytes <= 0 {
2436 2 : o.maxRetainedSizeBytes = defaultBatchMaxRetainedSize
2437 2 : }
2438 : }
2439 :
2440 : // BatchOption allows customizing the batch.
2441 : type BatchOption func(*batchOptions)
2442 :
2443 : // WithInitialSizeBytes sets a custom initial size for the batch. Defaults
2444 : // to 1KB.
2445 1 : func WithInitialSizeBytes(s int) BatchOption {
2446 1 : return func(opts *batchOptions) {
2447 1 : opts.initialSizeBytes = s
2448 1 : }
2449 : }
2450 :
2451 : // WithMaxRetainedSizeBytes sets a custom max size for the batch to be
2452 : // re-used. Any batch which exceeds the max retained size would be GC-ed.
2453 : // Defaults to 1MB.
2454 1 : func WithMaxRetainedSizeBytes(s int) BatchOption {
2455 1 : return func(opts *batchOptions) {
2456 1 : opts.maxRetainedSizeBytes = s
2457 1 : }
2458 : }
2459 :
2460 : // batchSort returns iterators for the sorted contents of the batch. It is
2461 : // intended for testing use only. The batch.Sort dance is done to prevent
2462 : // exposing this method in the public pebble interface.
2463 : func batchSort(
2464 : i interface{},
2465 : ) (
2466 : points internalIterator,
2467 : rangeDels keyspan.FragmentIterator,
2468 : rangeKeys keyspan.FragmentIterator,
2469 2 : ) {
2470 2 : b := i.(*Batch)
2471 2 : if b.Indexed() {
2472 2 : pointIter := b.newInternalIter(nil)
2473 2 : rangeDelIter := b.newRangeDelIter(nil, math.MaxUint64)
2474 2 : rangeKeyIter := b.newRangeKeyIter(nil, math.MaxUint64)
2475 2 : return pointIter, rangeDelIter, rangeKeyIter
2476 2 : }
2477 2 : f, err := newFlushableBatch(b, b.db.opts.Comparer)
2478 2 : if err != nil {
2479 0 : panic(err)
2480 : }
2481 2 : return f.newIter(nil), f.newRangeDelIter(nil), f.newRangeKeyIter(nil)
2482 : }
2483 :
2484 2 : func init() {
2485 2 : private.BatchSort = batchSort
2486 2 : }
|