Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "sort"
15 : "sync"
16 : "sync/atomic"
17 : "time"
18 : "unsafe"
19 :
20 : "github.com/cockroachdb/crlib/crtime"
21 : "github.com/cockroachdb/errors"
22 : "github.com/cockroachdb/pebble/batchrepr"
23 : "github.com/cockroachdb/pebble/internal/base"
24 : "github.com/cockroachdb/pebble/internal/batchskl"
25 : "github.com/cockroachdb/pebble/internal/humanize"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/keyspan"
28 : "github.com/cockroachdb/pebble/internal/private"
29 : "github.com/cockroachdb/pebble/internal/rangedel"
30 : "github.com/cockroachdb/pebble/internal/rangekey"
31 : "github.com/cockroachdb/pebble/internal/rawalloc"
32 : "github.com/cockroachdb/pebble/internal/treeprinter"
33 : )
34 :
35 : const (
36 : invalidBatchCount = 1<<32 - 1
37 : maxVarintLen32 = 5
38 :
39 : defaultBatchInitialSize = 1 << 10 // 1 KB
40 : defaultBatchMaxRetainedSize = 1 << 20 // 1 MB
41 : )
42 :
43 : // ErrNotIndexed means that a read operation on a batch failed because the
44 : // batch is not indexed and thus doesn't support reads.
45 : var ErrNotIndexed = errors.New("pebble: batch not indexed")
46 :
47 : // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
48 : var ErrInvalidBatch = batchrepr.ErrInvalidBatch
49 :
50 : // ErrBatchTooLarge indicates that the size of this batch is over the limit of 4GB.
51 : var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize)))
52 :
53 : // DeferredBatchOp represents a batch operation (eg. set, merge, delete) that is
54 : // being inserted into the batch. Indexing is not performed on the specified key
55 : // until Finish is called, hence the name deferred. This struct lets the caller
56 : // copy or encode keys/values directly into the batch representation instead of
57 : // copying into an intermediary buffer then having pebble.Batch copy off of it.
58 : type DeferredBatchOp struct {
59 : index *batchskl.Skiplist
60 :
61 : // Key and Value point to parts of the binary batch representation where
62 : // keys and values should be encoded/copied into. len(Key) and len(Value)
63 : // bytes must be copied into these slices respectively before calling
64 : // Finish(). Changing where these slices point to is not allowed.
65 : Key, Value []byte
66 : offset uint32
67 : }
68 :
69 : // Finish completes the addition of this batch operation, and adds it to the
70 : // index if necessary. Must be called once (and exactly once) keys/values
71 : // have been filled into Key and Value. Not calling Finish or not
72 : // copying/encoding keys will result in an incomplete index, and calling Finish
73 : // twice may result in a panic.
74 0 : func (d DeferredBatchOp) Finish() error {
75 0 : if d.index != nil {
76 0 : if err := d.index.Add(d.offset); err != nil {
77 0 : return err
78 0 : }
79 : }
80 0 : return nil
81 : }
82 :
83 : // A Batch is a sequence of Sets, Merges, Deletes, DeleteRanges, RangeKeySets,
84 : // RangeKeyUnsets, and/or RangeKeyDeletes that are applied atomically. Batch
85 : // implements the Reader interface, but only an indexed batch supports reading
86 : // (without error) via Get or NewIter. A non-indexed batch will return
87 : // ErrNotIndexed when read from. A batch is not safe for concurrent use, and
88 : // consumers should use a batch per goroutine or provide their own
89 : // synchronization.
90 : //
91 : // # Indexing
92 : //
93 : // Batches can be optionally indexed (see DB.NewIndexedBatch). An indexed batch
94 : // allows iteration via an Iterator (see Batch.NewIter). The iterator provides
95 : // a merged view of the operations in the batch and the underlying
96 : // database. This is implemented by treating the batch as an additional layer
97 : // in the LSM where every entry in the batch is considered newer than any entry
98 : // in the underlying database (batch entries have the InternalKeySeqNumBatch
99 : // bit set). By treating the batch as an additional layer in the LSM, iteration
100 : // supports all batch operations (i.e. Set, Merge, Delete, DeleteRange,
101 : // RangeKeySet, RangeKeyUnset, RangeKeyDelete) with minimal effort.
102 : //
103 : // The same key can be operated on multiple times in a batch, though only the
104 : // latest operation will be visible. For example, Put("a", "b"), Delete("a")
105 : // will cause the key "a" to not be visible in the batch. Put("a", "b"),
106 : // Put("a", "c") will cause a read of "a" to return the value "c".
107 : //
108 : // The batch index is implemented via an skiplist (internal/batchskl). While
109 : // the skiplist implementation is very fast, inserting into an indexed batch is
110 : // significantly slower than inserting into a non-indexed batch. Only use an
111 : // indexed batch if you require reading from it.
112 : //
113 : // # Atomic commit
114 : //
115 : // The operations in a batch are persisted by calling Batch.Commit which is
116 : // equivalent to calling DB.Apply(batch). A batch is committed atomically by
117 : // writing the internal batch representation to the WAL, adding all of the
118 : // batch operations to the memtable associated with the WAL, and then
119 : // incrementing the visible sequence number so that subsequent reads can see
120 : // the effects of the batch operations. If WriteOptions.Sync is true, a call to
121 : // Batch.Commit will guarantee that the batch is persisted to disk before
122 : // returning. See commitPipeline for more on the implementation details.
123 : //
124 : // # Large batches
125 : //
126 : // The size of a batch is limited to 4GB, the max that can be represented by
127 : // a uint32 type. Be aware that indexed batches require considerably more
128 : // memory for the skiplist structure (this skiplist is separate from the 4GB
129 : // batch limit). For users that require atomic writes of data that's greater
130 : // than 4GB, DB.Ingest() is able to atomically ingest pre-computed sstables.
131 : // A given WAL file has a single memtable associated with it (this restriction
132 : // could be removed, but doing so is onerous and complex). And a memtable has
133 : // a fixed size due to the underlying fixed size arena. Note that this differs
134 : // from RocksDB where a memtable can grow arbitrarily large using a list of
135 : // arena chunks. In RocksDB this is accomplished by storing pointers in the
136 : // arena memory, but that isn't possible in Go.
137 : //
138 : // During Batch.Commit, a batch which is larger than a threshold (>
139 : // MemTableSize/2) is wrapped in a flushableBatch and inserted into the queue
140 : // of memtables. A flushableBatch forces WAL to be rotated, but that happens
141 : // anyways when the memtable becomes full so this does not cause significant
142 : // WAL churn. Because the flushableBatch is readable as another layer in the
143 : // LSM, Batch.Commit returns as soon as the flushableBatch has been added to
144 : // the queue of memtables.
145 : //
146 : // Internally, a flushableBatch provides Iterator support by sorting the batch
147 : // contents (the batch is sorted once, when it is added to the memtable
148 : // queue). Sorting the batch contents and insertion of the contents into a
149 : // memtable have the same big-O time, but the constant factor dominates
150 : // here. Sorting is significantly faster and uses significantly less memory.
151 : //
152 : // # Internal representation
153 : //
154 : // The internal batch representation is a contiguous byte buffer with a fixed
155 : // 12-byte header, followed by a series of records.
156 : //
157 : // +-------------+------------+--- ... ---+
158 : // | SeqNum (8B) | Count (4B) | Entries |
159 : // +-------------+------------+--- ... ---+
160 : //
161 : // Each record has a 1-byte kind tag prefix, followed by 1 or 2 length prefixed
162 : // strings (varstring):
163 : //
164 : // +-----------+-----------------+-------------------+
165 : // | Kind (1B) | Key (varstring) | Value (varstring) |
166 : // +-----------+-----------------+-------------------+
167 : //
168 : // A varstring is a varint32 followed by N bytes of data. The Kind tags are
169 : // exactly those specified by InternalKeyKind. The following table shows the
170 : // format for records of each kind:
171 : //
172 : // InternalKeyKindDelete varstring
173 : // InternalKeyKindLogData varstring
174 : // InternalKeyKindIngestSST varstring
175 : // InternalKeyKindSet varstring varstring
176 : // InternalKeyKindMerge varstring varstring
177 : // InternalKeyKindRangeDelete varstring varstring
178 : // InternalKeyKindRangeKeySet varstring varstring
179 : // InternalKeyKindRangeKeyUnset varstring varstring
180 : // InternalKeyKindRangeKeyDelete varstring varstring
181 : //
182 : // The intuitive understanding here are that the arguments to Delete, Set,
183 : // Merge, DeleteRange and RangeKeyDelete are encoded into the batch. The
184 : // RangeKeySet and RangeKeyUnset operations are slightly more complicated,
185 : // encoding their end key, suffix and value [in the case of RangeKeySet] within
186 : // the Value varstring. For more information on the value encoding for
187 : // RangeKeySet and RangeKeyUnset, see the internal/rangekey package.
188 : //
189 : // The internal batch representation is the on disk format for a batch in the
190 : // WAL, and thus stable. New record kinds may be added, but the existing ones
191 : // will not be modified.
192 : type Batch struct {
193 : batchInternal
194 : applied atomic.Bool
195 : // lifecycle is used to negotiate the lifecycle of a Batch. A Batch and its
196 : // underlying batchInternal.data byte slice may be reused. There are two
197 : // mechanisms for reuse:
198 : //
199 : // 1. The caller may explicitly call [Batch.Reset] to reset the batch to be
200 : // empty (while retaining the underlying repr's buffer).
201 : // 2. The caller may call [Batch.Close], passing ownership off to Pebble,
202 : // which may reuse the batch's memory to service new callers to
203 : // [DB.NewBatch].
204 : //
205 : // There's a complication to reuse: When WAL failover is configured, the
206 : // Pebble commit pipeline may retain a pointer to the batch.data beyond the
207 : // return of [Batch.Commit]. The user of the Batch may commit their batch
208 : // and call Close or Reset before the commit pipeline is finished reading
209 : // the data slice. Recycling immediately would cause a data race.
210 : //
211 : // To resolve this data race, this [lifecycle] atomic is used to determine
212 : // safety and responsibility of reusing a batch. The low bits of the atomic
213 : // are used as a reference count (really just the lowest bit—in practice
214 : // there's only 1 code path that references). The [Batch] is passed into
215 : // [wal.Writer]'s WriteRecord method as a [RefCount] implementation. The
216 : // wal.Writer guarantees that if it will read [Batch.data] after the call to
217 : // WriteRecord returns, it will increment the reference count. When it's
218 : // complete, it'll unreference through invoking [Batch.Unref].
219 : //
220 : // When the committer of a batch indicates intent to recycle a Batch through
221 : // calling [Batch.Reset] or [Batch.Close], the lifecycle atomic is read. If
222 : // an outstanding reference remains, it's unsafe to reuse Batch.data yet. In
223 : // [Batch.Reset] the caller wants to reuse the [Batch] immediately, so we
224 : // discard b.data to recycle the struct but not the underlying byte slice.
225 : // In [Batch.Close], we set a special high bit [batchClosedBit] on lifecycle
226 : // that indicates that the user will not use [Batch] again and we're free to
227 : // recycle it when safe. When the commit pipeline eventually calls
228 : // [Batch.Unref], the [batchClosedBit] is noticed and the batch is
229 : // recycled.
230 : lifecycle atomic.Int32
231 : }
232 :
233 : // batchClosedBit is a bit stored on Batch.lifecycle to indicate that the user
234 : // called [Batch.Close] to release a Batch, but an open reference count
235 : // prevented immediate recycling.
236 : const batchClosedBit = 1 << 30
237 :
238 : // TODO(jackson): Hide the wal.RefCount implementation from the public Batch interface.
239 :
240 : // Ref implements wal.RefCount. If the WAL writer may need to read b.data after
241 : // it returns, it invokes Ref to increment the lifecycle's reference count. When
242 : // it's finished, it invokes Unref.
243 1 : func (b *Batch) Ref() {
244 1 : b.lifecycle.Add(+1)
245 1 : }
246 :
247 : // Unref implemets wal.RefCount.
248 1 : func (b *Batch) Unref() {
249 1 : if v := b.lifecycle.Add(-1); (v ^ batchClosedBit) == 0 {
250 1 : // The [batchClosedBit] high bit is set, and there are no outstanding
251 1 : // references. The user of the Batch called [Batch.Close], expecting the
252 1 : // batch to be recycled. However, our outstanding reference count
253 1 : // prevented recycling. As the last to dereference, we're now
254 1 : // responsible for releasing the batch.
255 1 : b.lifecycle.Store(0)
256 1 : b.release()
257 1 : }
258 : }
259 :
260 : // batchInternal contains the set of fields within Batch that are non-atomic and
261 : // capable of being reset using a *b = batchInternal{} struct copy.
262 : type batchInternal struct {
263 : // Data is the wire format of a batch's log entry:
264 : // - 8 bytes for a sequence number of the first batch element,
265 : // or zeroes if the batch has not yet been applied,
266 : // - 4 bytes for the count: the number of elements in the batch,
267 : // or "\xff\xff\xff\xff" if the batch is invalid,
268 : // - count elements, being:
269 : // - one byte for the kind
270 : // - the varint-string user key,
271 : // - the varint-string value (if kind != delete).
272 : // The sequence number and count are stored in little-endian order.
273 : //
274 : // The data field can be (but is not guaranteed to be) nil for new
275 : // batches. Large batches will set the data field to nil when committed as
276 : // the data has been moved to a flushableBatch and inserted into the queue of
277 : // memtables.
278 : data []byte
279 : comparer *base.Comparer
280 : opts batchOptions
281 :
282 : // An upper bound on required space to add this batch to a memtable.
283 : // Note that although batches are limited to 4 GiB in size, that limit
284 : // applies to len(data), not the memtable size. The upper bound on the
285 : // size of a memtable node is larger than the overhead of the batch's log
286 : // encoding, so memTableSize is larger than len(data) and may overflow a
287 : // uint32.
288 : memTableSize uint64
289 :
290 : // The db to which the batch will be committed. Do not change this field
291 : // after the batch has been created as it might invalidate internal state.
292 : // Batch.memTableSize is only refreshed if Batch.db is set. Setting db to
293 : // nil once it has been set implies that the Batch has encountered an error.
294 : db *DB
295 :
296 : // The count of records in the batch. This count will be stored in the batch
297 : // data whenever Repr() is called.
298 : count uint64
299 :
300 : // The count of range deletions in the batch. Updated every time a range
301 : // deletion is added.
302 : countRangeDels uint64
303 :
304 : // The count of range key sets, unsets and deletes in the batch. Updated
305 : // every time a RANGEKEYSET, RANGEKEYUNSET or RANGEKEYDEL key is added.
306 : countRangeKeys uint64
307 :
308 : // A deferredOp struct, stored in the Batch so that a pointer can be returned
309 : // from the *Deferred() methods rather than a value.
310 : deferredOp DeferredBatchOp
311 :
312 : // An optional skiplist keyed by offset into data of the entry.
313 : index *batchskl.Skiplist
314 : rangeDelIndex *batchskl.Skiplist
315 : rangeKeyIndex *batchskl.Skiplist
316 :
317 : // Fragmented range deletion tombstones. Cached the first time a range
318 : // deletion iterator is requested. The cache is invalidated whenever a new
319 : // range deletion is added to the batch. This cache can only be used when
320 : // opening an iterator to read at a batch sequence number >=
321 : // tombstonesSeqNum. This is the case for all new iterators created over a
322 : // batch but it's not the case for all cloned iterators.
323 : tombstones []keyspan.Span
324 : tombstonesSeqNum base.SeqNum
325 :
326 : // Fragmented range key spans. Cached the first time a range key iterator is
327 : // requested. The cache is invalidated whenever a new range key
328 : // (RangeKey{Set,Unset,Del}) is added to the batch. This cache can only be
329 : // used when opening an iterator to read at a batch sequence number >=
330 : // tombstonesSeqNum. This is the case for all new iterators created over a
331 : // batch but it's not the case for all cloned iterators.
332 : rangeKeys []keyspan.Span
333 : rangeKeysSeqNum base.SeqNum
334 :
335 : // The flushableBatch wrapper if the batch is too large to fit in the
336 : // memtable.
337 : flushable *flushableBatch
338 :
339 : // minimumFormatMajorVersion indicates the format major version required in
340 : // order to commit this batch. If an operation requires a particular format
341 : // major version, it ratchets the batch's minimumFormatMajorVersion. When
342 : // the batch is committed, this is validated against the database's current
343 : // format major version.
344 : minimumFormatMajorVersion FormatMajorVersion
345 :
346 : // Synchronous Apply uses the commit WaitGroup for both publishing the
347 : // seqnum and waiting for the WAL fsync (if needed). Asynchronous
348 : // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit
349 : // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for
350 : // waiting for the WAL fsync.
351 : //
352 : // TODO(sumeer): if we find that ApplyNoSyncWait in conjunction with
353 : // SyncWait is causing higher memory usage because of the time duration
354 : // between when the sync is already done, and a goroutine calls SyncWait
355 : // (followed by Batch.Close), we could separate out {fsyncWait, commitErr}
356 : // into a separate struct that is allocated separately (using another
357 : // sync.Pool), and only that struct needs to outlive Batch.Close (which
358 : // could then be called immediately after ApplyNoSyncWait). commitStats
359 : // will also need to be in this separate struct.
360 : commit sync.WaitGroup
361 : fsyncWait sync.WaitGroup
362 :
363 : commitStats BatchCommitStats
364 :
365 : commitErr error
366 :
367 : // Position bools together to reduce the sizeof the struct.
368 :
369 : // ingestedSSTBatch indicates that the batch contains one or more key kinds
370 : // of InternalKeyKindIngestSST. If the batch contains key kinds of IngestSST
371 : // then it will only contain key kinds of IngestSST.
372 : ingestedSSTBatch bool
373 :
374 : // committing is set to true when a batch begins to commit. It's used to
375 : // ensure the batch is not mutated concurrently. It is not an atomic
376 : // deliberately, so as to avoid the overhead on batch mutations. This is
377 : // okay, because under correct usage this field will never be accessed
378 : // concurrently. It's only under incorrect usage the memory accesses of this
379 : // variable may violate memory safety. Since we don't use atomics here,
380 : // false negatives are possible.
381 : committing bool
382 : }
383 :
384 : // BatchCommitStats exposes stats related to committing a batch.
385 : //
386 : // NB: there is no Pebble internal tracing (using LoggerAndTracer) of slow
387 : // batch commits. The caller can use these stats to do their own tracing as
388 : // needed.
389 : type BatchCommitStats struct {
390 : // TotalDuration is the time spent in DB.{Apply,ApplyNoSyncWait} or
391 : // Batch.Commit, plus the time waiting in Batch.SyncWait. If there is a gap
392 : // between calling ApplyNoSyncWait and calling SyncWait, that gap could
393 : // include some duration in which real work was being done for the commit
394 : // and will not be included here. This missing time is considered acceptable
395 : // since the goal of these stats is to understand user-facing latency.
396 : //
397 : // TotalDuration includes time spent in various queues both inside Pebble
398 : // and outside Pebble (I/O queues, goroutine scheduler queue, mutex wait
399 : // etc.). For some of these queues (which we consider important) the wait
400 : // times are included below -- these expose low-level implementation detail
401 : // and are meant for expert diagnosis and subject to change. There may be
402 : // unaccounted time after subtracting those values from TotalDuration.
403 : TotalDuration time.Duration
404 : // SemaphoreWaitDuration is the wait time for semaphores in
405 : // commitPipeline.Commit.
406 : SemaphoreWaitDuration time.Duration
407 : // WALQueueWaitDuration is the wait time for allocating memory blocks in the
408 : // LogWriter (due to the LogWriter not writing fast enough). At the moment
409 : // this is duration is always zero because a single WAL will allow
410 : // allocating memory blocks up to the entire memtable size. In the future,
411 : // we may pipeline WALs and bound the WAL queued blocks separately, so this
412 : // field is preserved for that possibility.
413 : WALQueueWaitDuration time.Duration
414 : // MemTableWriteStallDuration is the wait caused by a write stall due to too
415 : // many memtables (due to not flushing fast enough).
416 : MemTableWriteStallDuration time.Duration
417 : // L0ReadAmpWriteStallDuration is the wait caused by a write stall due to
418 : // high read amplification in L0 (due to not compacting fast enough out of
419 : // L0).
420 : L0ReadAmpWriteStallDuration time.Duration
421 : // WALRotationDuration is the wait time for WAL rotation, which includes
422 : // syncing and closing the old WAL and creating (or reusing) a new one.
423 : WALRotationDuration time.Duration
424 : // CommitWaitDuration is the wait for publishing the seqnum plus the
425 : // duration for the WAL sync (if requested). The former should be tiny and
426 : // one can assume that this is all due to the WAL sync.
427 : CommitWaitDuration time.Duration
428 : }
429 :
430 : var _ Reader = (*Batch)(nil)
431 : var _ Writer = (*Batch)(nil)
432 :
433 : var batchPool = sync.Pool{
434 1 : New: func() interface{} {
435 1 : return &Batch{}
436 1 : },
437 : }
438 :
439 : type indexedBatch struct {
440 : batch Batch
441 : index batchskl.Skiplist
442 : }
443 :
444 : var indexedBatchPool = sync.Pool{
445 1 : New: func() interface{} {
446 1 : return &indexedBatch{}
447 1 : },
448 : }
449 :
450 1 : func newBatch(db *DB, opts ...BatchOption) *Batch {
451 1 : b := batchPool.Get().(*Batch)
452 1 : b.db = db
453 1 : b.opts.ensureDefaults()
454 1 : for _, opt := range opts {
455 0 : opt(&b.opts)
456 0 : }
457 1 : return b
458 : }
459 :
460 0 : func newBatchWithSize(db *DB, size int, opts ...BatchOption) *Batch {
461 0 : b := newBatch(db, opts...)
462 0 : if cap(b.data) < size {
463 0 : b.data = rawalloc.New(0, size)
464 0 : }
465 0 : return b
466 : }
467 :
468 1 : func newIndexedBatch(db *DB, comparer *Comparer) *Batch {
469 1 : i := indexedBatchPool.Get().(*indexedBatch)
470 1 : i.batch.comparer = comparer
471 1 : i.batch.db = db
472 1 : i.batch.index = &i.index
473 1 : i.batch.index.Init(&i.batch.data, comparer.Compare, comparer.AbbreviatedKey)
474 1 : i.batch.opts.ensureDefaults()
475 1 : return &i.batch
476 1 : }
477 :
478 0 : func newIndexedBatchWithSize(db *DB, comparer *Comparer, size int) *Batch {
479 0 : b := newIndexedBatch(db, comparer)
480 0 : if cap(b.data) < size {
481 0 : b.data = rawalloc.New(0, size)
482 0 : }
483 0 : return b
484 : }
485 :
486 : // nextSeqNum returns the batch "sequence number" that will be given to the next
487 : // key written to the batch. During iteration keys within an indexed batch are
488 : // given a sequence number consisting of their offset within the batch combined
489 : // with the base.SeqNumBatchBit bit. These sequence numbers are only
490 : // used during iteration, and the keys are assigned ordinary sequence numbers
491 : // when the batch is committed.
492 1 : func (b *Batch) nextSeqNum() base.SeqNum {
493 1 : return base.SeqNum(len(b.data)) | base.SeqNumBatchBit
494 1 : }
495 :
496 1 : func (b *Batch) release() {
497 1 : if b.db == nil {
498 1 : // The batch was not created using newBatch or newIndexedBatch, or an error
499 1 : // was encountered. We don't try to reuse batches that encountered an error
500 1 : // because they might be stuck somewhere in the system and attempting to
501 1 : // reuse such batches is a recipe for onerous debugging sessions. Instead,
502 1 : // let the GC do its job.
503 1 : return
504 1 : }
505 1 : b.db = nil
506 1 :
507 1 : // NB: This is ugly (it would be cleaner if we could just assign a Batch{}),
508 1 : // but necessary so that we can use atomic.StoreUint32 for the Batch.applied
509 1 : // field. Without using an atomic to clear that field the Go race detector
510 1 : // complains.
511 1 : b.reset()
512 1 : b.comparer = nil
513 1 :
514 1 : if b.index == nil {
515 1 : batchPool.Put(b)
516 1 : } else {
517 1 : b.index, b.rangeDelIndex, b.rangeKeyIndex = nil, nil, nil
518 1 : indexedBatchPool.Put((*indexedBatch)(unsafe.Pointer(b)))
519 1 : }
520 : }
521 :
522 1 : func (b *Batch) refreshMemTableSize() error {
523 1 : b.memTableSize = 0
524 1 : if len(b.data) < batchrepr.HeaderLen {
525 0 : return nil
526 0 : }
527 :
528 1 : b.countRangeDels = 0
529 1 : b.countRangeKeys = 0
530 1 : b.minimumFormatMajorVersion = 0
531 1 : for r := b.Reader(); ; {
532 1 : kind, key, value, ok, err := r.Next()
533 1 : if !ok {
534 1 : if err != nil {
535 0 : return err
536 0 : }
537 1 : break
538 : }
539 1 : switch kind {
540 1 : case InternalKeyKindRangeDelete:
541 1 : b.countRangeDels++
542 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
543 1 : b.countRangeKeys++
544 1 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge, InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete:
545 : // fallthrough
546 1 : case InternalKeyKindDeleteSized:
547 1 : if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
548 1 : b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
549 1 : }
550 0 : case InternalKeyKindLogData:
551 0 : // LogData does not contribute to memtable size.
552 0 : continue
553 1 : case InternalKeyKindIngestSST:
554 1 : if b.minimumFormatMajorVersion < FormatFlushableIngest {
555 1 : b.minimumFormatMajorVersion = FormatFlushableIngest
556 1 : }
557 : // This key kind doesn't contribute to the memtable size.
558 1 : continue
559 1 : case InternalKeyKindExcise:
560 1 : if b.minimumFormatMajorVersion < FormatFlushableIngestExcises {
561 1 : b.minimumFormatMajorVersion = FormatFlushableIngestExcises
562 1 : }
563 : // This key kind doesn't contribute to the memtable size.
564 1 : continue
565 0 : default:
566 0 : // Note In some circumstances this might be temporary memory
567 0 : // corruption that can be recovered by discarding the batch and
568 0 : // trying again. In other cases, the batch repr might've been
569 0 : // already persisted elsewhere, and we'll loop continuously trying
570 0 : // to commit the same corrupted batch. The caller is responsible for
571 0 : // distinguishing.
572 0 : return errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
573 : }
574 1 : b.memTableSize += memTableEntrySize(len(key), len(value))
575 : }
576 1 : return nil
577 : }
578 :
579 : // Apply the operations contained in the batch to the receiver batch.
580 : //
581 : // It is safe to modify the contents of the arguments after Apply returns.
582 : //
583 : // Apply returns ErrInvalidBatch if the provided batch is invalid in any way.
584 1 : func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
585 1 : if b.ingestedSSTBatch {
586 0 : panic("pebble: invalid batch application")
587 : }
588 1 : if len(batch.data) == 0 {
589 1 : return nil
590 1 : }
591 1 : if len(batch.data) < batchrepr.HeaderLen {
592 0 : return ErrInvalidBatch
593 0 : }
594 :
595 1 : offset := len(b.data)
596 1 : if offset == 0 {
597 0 : b.init(offset)
598 0 : offset = batchrepr.HeaderLen
599 0 : }
600 1 : b.data = append(b.data, batch.data[batchrepr.HeaderLen:]...)
601 1 :
602 1 : b.setCount(b.Count() + batch.Count())
603 1 :
604 1 : if b.db != nil || b.index != nil {
605 1 : // Only iterate over the new entries if we need to track memTableSize or in
606 1 : // order to update the index.
607 1 : for iter := batchrepr.Reader(b.data[offset:]); len(iter) > 0; {
608 1 : offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
609 1 : kind, key, value, ok, err := iter.Next()
610 1 : if !ok {
611 0 : if err != nil {
612 0 : return err
613 0 : }
614 0 : break
615 : }
616 1 : switch kind {
617 0 : case InternalKeyKindRangeDelete:
618 0 : b.countRangeDels++
619 0 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
620 0 : b.countRangeKeys++
621 0 : case InternalKeyKindIngestSST, InternalKeyKindExcise:
622 0 : panic("pebble: invalid key kind for batch")
623 0 : case InternalKeyKindLogData:
624 0 : // LogData does not contribute to memtable size.
625 0 : continue
626 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge,
627 1 : InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
628 : // fallthrough
629 0 : default:
630 0 : // Note In some circumstances this might be temporary memory
631 0 : // corruption that can be recovered by discarding the batch and
632 0 : // trying again. In other cases, the batch repr might've been
633 0 : // already persisted elsewhere, and we'll loop continuously
634 0 : // trying to commit the same corrupted batch. The caller is
635 0 : // responsible for distinguishing.
636 0 : return errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
637 : }
638 1 : if b.index != nil {
639 1 : var err error
640 1 : switch kind {
641 0 : case InternalKeyKindRangeDelete:
642 0 : b.tombstones = nil
643 0 : b.tombstonesSeqNum = 0
644 0 : if b.rangeDelIndex == nil {
645 0 : b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
646 0 : }
647 0 : err = b.rangeDelIndex.Add(uint32(offset))
648 0 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
649 0 : b.rangeKeys = nil
650 0 : b.rangeKeysSeqNum = 0
651 0 : if b.rangeKeyIndex == nil {
652 0 : b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
653 0 : }
654 0 : err = b.rangeKeyIndex.Add(uint32(offset))
655 1 : default:
656 1 : err = b.index.Add(uint32(offset))
657 : }
658 1 : if err != nil {
659 0 : return err
660 0 : }
661 : }
662 1 : b.memTableSize += memTableEntrySize(len(key), len(value))
663 : }
664 : }
665 1 : return nil
666 : }
667 :
668 : // Get gets the value for the given key. It returns ErrNotFound if the Batch
669 : // does not contain the key.
670 : //
671 : // The caller should not modify the contents of the returned slice, but it is
672 : // safe to modify the contents of the argument after Get returns. The returned
673 : // slice will remain valid until the returned Closer is closed. On success, the
674 : // caller MUST call closer.Close() or a memory leak will occur.
675 1 : func (b *Batch) Get(key []byte) ([]byte, io.Closer, error) {
676 1 : if b.index == nil {
677 0 : return nil, nil, ErrNotIndexed
678 0 : }
679 1 : return b.db.getInternal(key, b, nil /* snapshot */)
680 : }
681 :
682 1 : func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind InternalKeyKind) {
683 1 : if b.committing {
684 0 : panic("pebble: batch already committing")
685 : }
686 1 : if len(b.data) == 0 {
687 1 : b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchrepr.HeaderLen)
688 1 : }
689 1 : b.count++
690 1 : b.memTableSize += memTableEntrySize(keyLen, valueLen)
691 1 :
692 1 : pos := len(b.data)
693 1 : b.deferredOp.offset = uint32(pos)
694 1 : b.grow(1 + 2*maxVarintLen32 + keyLen + valueLen)
695 1 : b.data[pos] = byte(kind)
696 1 : pos++
697 1 :
698 1 : {
699 1 : // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
700 1 : // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
701 1 : // versions show this to not be a performance win.
702 1 : x := uint32(keyLen)
703 1 : for x >= 0x80 {
704 0 : b.data[pos] = byte(x) | 0x80
705 0 : x >>= 7
706 0 : pos++
707 0 : }
708 1 : b.data[pos] = byte(x)
709 1 : pos++
710 : }
711 :
712 1 : b.deferredOp.Key = b.data[pos : pos+keyLen]
713 1 : pos += keyLen
714 1 :
715 1 : {
716 1 : // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
717 1 : // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
718 1 : // versions show this to not be a performance win.
719 1 : x := uint32(valueLen)
720 1 : for x >= 0x80 {
721 0 : b.data[pos] = byte(x) | 0x80
722 0 : x >>= 7
723 0 : pos++
724 0 : }
725 1 : b.data[pos] = byte(x)
726 1 : pos++
727 : }
728 :
729 1 : b.deferredOp.Value = b.data[pos : pos+valueLen]
730 1 : // Shrink data since varints may be shorter than the upper bound.
731 1 : b.data = b.data[:pos+valueLen]
732 : }
733 :
734 1 : func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) {
735 1 : if b.committing {
736 0 : panic("pebble: batch already committing")
737 : }
738 1 : if len(b.data) == 0 {
739 1 : b.init(keyLen + binary.MaxVarintLen64 + batchrepr.HeaderLen)
740 1 : }
741 1 : b.count++
742 1 : b.memTableSize += memTableEntrySize(keyLen, 0)
743 1 :
744 1 : pos := len(b.data)
745 1 : b.deferredOp.offset = uint32(pos)
746 1 : b.grow(1 + maxVarintLen32 + keyLen)
747 1 : b.data[pos] = byte(kind)
748 1 : pos++
749 1 :
750 1 : {
751 1 : // TODO(peter): Manually inlined version binary.PutUvarint(). Remove if
752 1 : // go1.13 or future versions show this to not be a performance win. See
753 1 : // BenchmarkBatchSet.
754 1 : x := uint32(keyLen)
755 1 : for x >= 0x80 {
756 0 : b.data[pos] = byte(x) | 0x80
757 0 : x >>= 7
758 0 : pos++
759 0 : }
760 1 : b.data[pos] = byte(x)
761 1 : pos++
762 : }
763 :
764 1 : b.deferredOp.Key = b.data[pos : pos+keyLen]
765 1 : b.deferredOp.Value = nil
766 1 :
767 1 : // Shrink data since varint may be shorter than the upper bound.
768 1 : b.data = b.data[:pos+keyLen]
769 : }
770 :
771 : // AddInternalKey allows the caller to add an internal key of point key or range
772 : // key kinds (but not RangeDelete) to a batch. Passing in an internal key of
773 : // kind RangeDelete will result in a panic. Note that the seqnum in the internal
774 : // key is effectively ignored, even though the Kind is preserved. This is
775 : // because the batch format does not allow for a per-key seqnum to be specified,
776 : // only a batch-wide one.
777 : //
778 : // Note that non-indexed keys (IngestKeyKind{LogData,IngestSST}) are not
779 : // supported with this method as they require specialized logic.
780 1 : func (b *Batch) AddInternalKey(key *base.InternalKey, value []byte, _ *WriteOptions) error {
781 1 : keyLen := len(key.UserKey)
782 1 : hasValue := false
783 1 : switch kind := key.Kind(); kind {
784 0 : case InternalKeyKindRangeDelete:
785 0 : panic("unexpected range delete in AddInternalKey")
786 0 : case InternalKeyKindSingleDelete, InternalKeyKindDelete:
787 0 : b.prepareDeferredKeyRecord(keyLen, kind)
788 0 : b.deferredOp.index = b.index
789 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
790 1 : b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
791 1 : hasValue = true
792 1 : b.incrementRangeKeysCount()
793 0 : default:
794 0 : b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
795 0 : hasValue = true
796 0 : b.deferredOp.index = b.index
797 : }
798 1 : copy(b.deferredOp.Key, key.UserKey)
799 1 : if hasValue {
800 1 : copy(b.deferredOp.Value, value)
801 1 : }
802 :
803 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
804 : // in go1.13 will remove the need for this.
805 1 : if b.index != nil {
806 0 : if err := b.index.Add(b.deferredOp.offset); err != nil {
807 0 : return err
808 0 : }
809 : }
810 1 : return nil
811 : }
812 :
813 : // Set adds an action to the batch that sets the key to map to the value.
814 : //
815 : // It is safe to modify the contents of the arguments after Set returns.
816 1 : func (b *Batch) Set(key, value []byte, _ *WriteOptions) error {
817 1 : deferredOp := b.SetDeferred(len(key), len(value))
818 1 : copy(deferredOp.Key, key)
819 1 : copy(deferredOp.Value, value)
820 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
821 1 : // in go1.13 will remove the need for this.
822 1 : if b.index != nil {
823 1 : if err := b.index.Add(deferredOp.offset); err != nil {
824 0 : return err
825 0 : }
826 : }
827 1 : return nil
828 : }
829 :
830 : // SetDeferred is similar to Set in that it adds a set operation to the batch,
831 : // except it only takes in key/value lengths instead of complete slices,
832 : // letting the caller encode into those objects and then call Finish() on the
833 : // returned object.
834 1 : func (b *Batch) SetDeferred(keyLen, valueLen int) *DeferredBatchOp {
835 1 : b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindSet)
836 1 : b.deferredOp.index = b.index
837 1 : return &b.deferredOp
838 1 : }
839 :
840 : // Merge adds an action to the batch that merges the value at key with the new
841 : // value. The details of the merge are dependent upon the configured merge
842 : // operator.
843 : //
844 : // It is safe to modify the contents of the arguments after Merge returns.
845 1 : func (b *Batch) Merge(key, value []byte, _ *WriteOptions) error {
846 1 : deferredOp := b.MergeDeferred(len(key), len(value))
847 1 : copy(deferredOp.Key, key)
848 1 : copy(deferredOp.Value, value)
849 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
850 1 : // in go1.13 will remove the need for this.
851 1 : if b.index != nil {
852 1 : if err := b.index.Add(deferredOp.offset); err != nil {
853 0 : return err
854 0 : }
855 : }
856 1 : return nil
857 : }
858 :
859 : // MergeDeferred is similar to Merge in that it adds a merge operation to the
860 : // batch, except it only takes in key/value lengths instead of complete slices,
861 : // letting the caller encode into those objects and then call Finish() on the
862 : // returned object.
863 1 : func (b *Batch) MergeDeferred(keyLen, valueLen int) *DeferredBatchOp {
864 1 : b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindMerge)
865 1 : b.deferredOp.index = b.index
866 1 : return &b.deferredOp
867 1 : }
868 :
869 : // Delete adds an action to the batch that deletes the entry for key.
870 : //
871 : // It is safe to modify the contents of the arguments after Delete returns.
872 1 : func (b *Batch) Delete(key []byte, _ *WriteOptions) error {
873 1 : deferredOp := b.DeleteDeferred(len(key))
874 1 : copy(deferredOp.Key, key)
875 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
876 1 : // in go1.13 will remove the need for this.
877 1 : if b.index != nil {
878 1 : if err := b.index.Add(deferredOp.offset); err != nil {
879 0 : return err
880 0 : }
881 : }
882 1 : return nil
883 : }
884 :
885 : // DeleteDeferred is similar to Delete in that it adds a delete operation to
886 : // the batch, except it only takes in key/value lengths instead of complete
887 : // slices, letting the caller encode into those objects and then call Finish()
888 : // on the returned object.
889 1 : func (b *Batch) DeleteDeferred(keyLen int) *DeferredBatchOp {
890 1 : b.prepareDeferredKeyRecord(keyLen, InternalKeyKindDelete)
891 1 : b.deferredOp.index = b.index
892 1 : return &b.deferredOp
893 1 : }
894 :
895 : // DeleteSized behaves identically to Delete, but takes an additional
896 : // argument indicating the size of the value being deleted. DeleteSized
897 : // should be preferred when the caller has the expectation that there exists
898 : // a single internal KV pair for the key (eg, the key has not been
899 : // overwritten recently), and the caller knows the size of its value.
900 : //
901 : // DeleteSized will record the value size within the tombstone and use it to
902 : // inform compaction-picking heuristics which strive to reduce space
903 : // amplification in the LSM. This "calling your shot" mechanic allows the
904 : // storage engine to more accurately estimate and reduce space amplification.
905 : //
906 : // It is safe to modify the contents of the arguments after DeleteSized
907 : // returns.
908 1 : func (b *Batch) DeleteSized(key []byte, deletedValueSize uint32, _ *WriteOptions) error {
909 1 : deferredOp := b.DeleteSizedDeferred(len(key), deletedValueSize)
910 1 : copy(b.deferredOp.Key, key)
911 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Check if in a
912 1 : // later Go release this is unnecessary.
913 1 : if b.index != nil {
914 1 : if err := b.index.Add(deferredOp.offset); err != nil {
915 0 : return err
916 0 : }
917 : }
918 1 : return nil
919 : }
920 :
921 : // DeleteSizedDeferred is similar to DeleteSized in that it adds a sized delete
922 : // operation to the batch, except it only takes in key length instead of a
923 : // complete key slice, letting the caller encode into the DeferredBatchOp.Key
924 : // slice and then call Finish() on the returned object.
925 1 : func (b *Batch) DeleteSizedDeferred(keyLen int, deletedValueSize uint32) *DeferredBatchOp {
926 1 : if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
927 1 : b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
928 1 : }
929 :
930 : // Encode the sum of the key length and the value in the value.
931 1 : v := uint64(deletedValueSize) + uint64(keyLen)
932 1 :
933 1 : // Encode `v` as a varint.
934 1 : var buf [binary.MaxVarintLen64]byte
935 1 : n := 0
936 1 : {
937 1 : x := v
938 1 : for x >= 0x80 {
939 0 : buf[n] = byte(x) | 0x80
940 0 : x >>= 7
941 0 : n++
942 0 : }
943 1 : buf[n] = byte(x)
944 1 : n++
945 : }
946 :
947 : // NB: In batch entries and sstable entries, values are stored as
948 : // varstrings. Here, the value is itself a simple varint. This results in an
949 : // unnecessary double layer of encoding:
950 : // varint(n) varint(deletedValueSize)
951 : // The first varint will always be 1-byte, since a varint-encoded uint64
952 : // will never exceed 128 bytes. This unnecessary extra byte and wrapping is
953 : // preserved to avoid special casing across the database, and in particular
954 : // in sstable block decoding which is performance sensitive.
955 1 : b.prepareDeferredKeyValueRecord(keyLen, n, InternalKeyKindDeleteSized)
956 1 : b.deferredOp.index = b.index
957 1 : copy(b.deferredOp.Value, buf[:n])
958 1 : return &b.deferredOp
959 : }
960 :
961 : // SingleDelete adds an action to the batch that single deletes the entry for key.
962 : // WARNING: See the detailed warning in Writer.SingleDelete before using this.
963 : //
964 : // It is safe to modify the contents of the arguments after SingleDelete returns.
965 1 : func (b *Batch) SingleDelete(key []byte, _ *WriteOptions) error {
966 1 : deferredOp := b.SingleDeleteDeferred(len(key))
967 1 : copy(deferredOp.Key, key)
968 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
969 1 : // in go1.13 will remove the need for this.
970 1 : if b.index != nil {
971 1 : if err := b.index.Add(deferredOp.offset); err != nil {
972 0 : return err
973 0 : }
974 : }
975 1 : return nil
976 : }
977 :
978 : // SingleDeleteDeferred is similar to SingleDelete in that it adds a single delete
979 : // operation to the batch, except it only takes in key/value lengths instead of
980 : // complete slices, letting the caller encode into those objects and then call
981 : // Finish() on the returned object.
982 : //
983 : // WARNING: See the detailed warning in Writer.SingleDelete before using this.
984 1 : func (b *Batch) SingleDeleteDeferred(keyLen int) *DeferredBatchOp {
985 1 : b.prepareDeferredKeyRecord(keyLen, InternalKeyKindSingleDelete)
986 1 : b.deferredOp.index = b.index
987 1 : return &b.deferredOp
988 1 : }
989 :
990 : // DeleteRange deletes all of the point keys (and values) in the range
991 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
992 : // delete overlapping range keys (eg, keys set via RangeKeySet).
993 : //
994 : // It is safe to modify the contents of the arguments after DeleteRange
995 : // returns.
996 1 : func (b *Batch) DeleteRange(start, end []byte, _ *WriteOptions) error {
997 1 : deferredOp := b.DeleteRangeDeferred(len(start), len(end))
998 1 : copy(deferredOp.Key, start)
999 1 : copy(deferredOp.Value, end)
1000 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
1001 1 : // in go1.13 will remove the need for this.
1002 1 : if deferredOp.index != nil {
1003 1 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
1004 0 : return err
1005 0 : }
1006 : }
1007 1 : return nil
1008 : }
1009 :
1010 : // DeleteRangeDeferred is similar to DeleteRange in that it adds a delete range
1011 : // operation to the batch, except it only takes in key lengths instead of
1012 : // complete slices, letting the caller encode into those objects and then call
1013 : // Finish() on the returned object. Note that DeferredBatchOp.Key should be
1014 : // populated with the start key, and DeferredBatchOp.Value should be populated
1015 : // with the end key.
1016 1 : func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp {
1017 1 : b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeDelete)
1018 1 : b.countRangeDels++
1019 1 : if b.index != nil {
1020 1 : b.tombstones = nil
1021 1 : b.tombstonesSeqNum = 0
1022 1 : // Range deletions are rare, so we lazily allocate the index for them.
1023 1 : if b.rangeDelIndex == nil {
1024 1 : b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
1025 1 : }
1026 1 : b.deferredOp.index = b.rangeDelIndex
1027 : }
1028 1 : return &b.deferredOp
1029 : }
1030 :
1031 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
1032 : // timestamp suffix to value. The suffix is optional. If any portion of the key
1033 : // range [start, end) is already set by a range key with the same suffix value,
1034 : // RangeKeySet overrides it.
1035 : //
1036 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
1037 1 : func (b *Batch) RangeKeySet(start, end, suffix, value []byte, _ *WriteOptions) error {
1038 1 : if invariants.Enabled && b.db != nil {
1039 1 : // RangeKeySet is only supported on prefix keys.
1040 1 : if b.db.opts.Comparer.Split(start) != len(start) {
1041 0 : panic("RangeKeySet called with suffixed start key")
1042 : }
1043 1 : if b.db.opts.Comparer.Split(end) != len(end) {
1044 0 : panic("RangeKeySet called with suffixed end key")
1045 : }
1046 : }
1047 1 : suffixValues := [1]rangekey.SuffixValue{{Suffix: suffix, Value: value}}
1048 1 : internalValueLen := rangekey.EncodedSetValueLen(end, suffixValues[:])
1049 1 :
1050 1 : deferredOp := b.rangeKeySetDeferred(len(start), internalValueLen)
1051 1 : copy(deferredOp.Key, start)
1052 1 : n := rangekey.EncodeSetValue(deferredOp.Value, end, suffixValues[:])
1053 1 : if n != internalValueLen {
1054 0 : panic("unexpected internal value length mismatch")
1055 : }
1056 :
1057 : // Manually inline DeferredBatchOp.Finish().
1058 1 : if deferredOp.index != nil {
1059 1 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
1060 0 : return err
1061 0 : }
1062 : }
1063 1 : return nil
1064 : }
1065 :
1066 1 : func (b *Batch) rangeKeySetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
1067 1 : b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeySet)
1068 1 : b.incrementRangeKeysCount()
1069 1 : return &b.deferredOp
1070 1 : }
1071 :
1072 1 : func (b *Batch) incrementRangeKeysCount() {
1073 1 : b.countRangeKeys++
1074 1 : if b.index != nil {
1075 1 : b.rangeKeys = nil
1076 1 : b.rangeKeysSeqNum = 0
1077 1 : // Range keys are rare, so we lazily allocate the index for them.
1078 1 : if b.rangeKeyIndex == nil {
1079 1 : b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
1080 1 : }
1081 1 : b.deferredOp.index = b.rangeKeyIndex
1082 : }
1083 : }
1084 :
1085 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
1086 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
1087 : // range key. RangeKeyUnset only removes portions of range keys that fall within
1088 : // the [start, end) key span, and only range keys with suffixes that exactly
1089 : // match the unset suffix.
1090 : //
1091 : // It is safe to modify the contents of the arguments after RangeKeyUnset
1092 : // returns.
1093 1 : func (b *Batch) RangeKeyUnset(start, end, suffix []byte, _ *WriteOptions) error {
1094 1 : if invariants.Enabled && b.db != nil {
1095 1 : // RangeKeyUnset is only supported on prefix keys.
1096 1 : if b.db.opts.Comparer.Split(start) != len(start) {
1097 0 : panic("RangeKeyUnset called with suffixed start key")
1098 : }
1099 1 : if b.db.opts.Comparer.Split(end) != len(end) {
1100 0 : panic("RangeKeyUnset called with suffixed end key")
1101 : }
1102 : }
1103 1 : suffixes := [1][]byte{suffix}
1104 1 : internalValueLen := rangekey.EncodedUnsetValueLen(end, suffixes[:])
1105 1 :
1106 1 : deferredOp := b.rangeKeyUnsetDeferred(len(start), internalValueLen)
1107 1 : copy(deferredOp.Key, start)
1108 1 : n := rangekey.EncodeUnsetValue(deferredOp.Value, end, suffixes[:])
1109 1 : if n != internalValueLen {
1110 0 : panic("unexpected internal value length mismatch")
1111 : }
1112 :
1113 : // Manually inline DeferredBatchOp.Finish()
1114 1 : if deferredOp.index != nil {
1115 1 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
1116 0 : return err
1117 0 : }
1118 : }
1119 1 : return nil
1120 : }
1121 :
1122 1 : func (b *Batch) rangeKeyUnsetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
1123 1 : b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeyUnset)
1124 1 : b.incrementRangeKeysCount()
1125 1 : return &b.deferredOp
1126 1 : }
1127 :
1128 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
1129 : // (inclusive on start, exclusive on end). It does not delete point keys (for
1130 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
1131 : // bounds, including those with or without suffixes.
1132 : //
1133 : // It is safe to modify the contents of the arguments after RangeKeyDelete
1134 : // returns.
1135 1 : func (b *Batch) RangeKeyDelete(start, end []byte, _ *WriteOptions) error {
1136 1 : if invariants.Enabled && b.db != nil {
1137 1 : // RangeKeyDelete is only supported on prefix keys.
1138 1 : if b.db.opts.Comparer.Split(start) != len(start) {
1139 0 : panic("RangeKeyDelete called with suffixed start key")
1140 : }
1141 1 : if b.db.opts.Comparer.Split(end) != len(end) {
1142 0 : panic("RangeKeyDelete called with suffixed end key")
1143 : }
1144 : }
1145 1 : deferredOp := b.RangeKeyDeleteDeferred(len(start), len(end))
1146 1 : copy(deferredOp.Key, start)
1147 1 : copy(deferredOp.Value, end)
1148 1 : // Manually inline DeferredBatchOp.Finish().
1149 1 : if deferredOp.index != nil {
1150 1 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
1151 0 : return err
1152 0 : }
1153 : }
1154 1 : return nil
1155 : }
1156 :
1157 : // RangeKeyDeleteDeferred is similar to RangeKeyDelete in that it adds an
1158 : // operation to delete range keys to the batch, except it only takes in key
1159 : // lengths instead of complete slices, letting the caller encode into those
1160 : // objects and then call Finish() on the returned object. Note that
1161 : // DeferredBatchOp.Key should be populated with the start key, and
1162 : // DeferredBatchOp.Value should be populated with the end key.
1163 1 : func (b *Batch) RangeKeyDeleteDeferred(startLen, endLen int) *DeferredBatchOp {
1164 1 : b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeKeyDelete)
1165 1 : b.incrementRangeKeysCount()
1166 1 : return &b.deferredOp
1167 1 : }
1168 :
1169 : // LogData adds the specified to the batch. The data will be written to the
1170 : // WAL, but not added to memtables or sstables. Log data is never indexed,
1171 : // which makes it useful for testing WAL performance.
1172 : //
1173 : // It is safe to modify the contents of the argument after LogData returns.
1174 1 : func (b *Batch) LogData(data []byte, _ *WriteOptions) error {
1175 1 : origCount, origMemTableSize := b.count, b.memTableSize
1176 1 : b.prepareDeferredKeyRecord(len(data), InternalKeyKindLogData)
1177 1 : copy(b.deferredOp.Key, data)
1178 1 : // Since LogData only writes to the WAL and does not affect the memtable, we
1179 1 : // restore b.count and b.memTableSize to their origin values. Note that
1180 1 : // Batch.count only refers to records that are added to the memtable.
1181 1 : b.count, b.memTableSize = origCount, origMemTableSize
1182 1 : return nil
1183 1 : }
1184 :
1185 : // IngestSST adds the TableNum for an sstable to the batch. The data will only be
1186 : // written to the WAL (not added to memtables or sstables).
1187 1 : func (b *Batch) ingestSST(tableNum base.TableNum) {
1188 1 : if b.Empty() {
1189 1 : b.ingestedSSTBatch = true
1190 1 : } else if !b.ingestedSSTBatch {
1191 0 : // Batch contains other key kinds.
1192 0 : panic("pebble: invalid call to ingestSST")
1193 : }
1194 :
1195 1 : origMemTableSize := b.memTableSize
1196 1 : var buf [binary.MaxVarintLen64]byte
1197 1 : length := binary.PutUvarint(buf[:], uint64(tableNum))
1198 1 : b.prepareDeferredKeyRecord(length, InternalKeyKindIngestSST)
1199 1 : copy(b.deferredOp.Key, buf[:length])
1200 1 : // Since IngestSST writes only to the WAL and does not affect the memtable,
1201 1 : // we restore b.memTableSize to its original value. Note that Batch.count
1202 1 : // is not reset because for the InternalKeyKindIngestSST the count is the
1203 1 : // number of sstable paths which have been added to the batch.
1204 1 : b.memTableSize = origMemTableSize
1205 1 : b.minimumFormatMajorVersion = FormatFlushableIngest
1206 : }
1207 :
1208 : // Excise adds the excise span for a flushable ingest containing an excise. The data
1209 : // will only be written to the WAL (not added to memtables or sstables).
1210 1 : func (b *Batch) excise(start, end []byte) {
1211 1 : if b.Empty() {
1212 1 : b.ingestedSSTBatch = true
1213 1 : } else if !b.ingestedSSTBatch {
1214 0 : // Batch contains other key kinds.
1215 0 : panic("pebble: invalid call to excise")
1216 : }
1217 :
1218 1 : origMemTableSize := b.memTableSize
1219 1 : b.prepareDeferredKeyValueRecord(len(start), len(end), InternalKeyKindExcise)
1220 1 : copy(b.deferredOp.Key, start)
1221 1 : copy(b.deferredOp.Value, end)
1222 1 : // Since excise writes only to the WAL and does not affect the memtable,
1223 1 : // we restore b.memTableSize to its original value. Note that Batch.count
1224 1 : // is not reset because for the InternalKeyKindIngestSST/Excise the count
1225 1 : // is the number of sstable paths which have been added to the batch.
1226 1 : b.memTableSize = origMemTableSize
1227 1 : b.minimumFormatMajorVersion = FormatFlushableIngestExcises
1228 : }
1229 :
1230 : // Empty returns true if the batch is empty, and false otherwise.
1231 1 : func (b *Batch) Empty() bool {
1232 1 : return batchrepr.IsEmpty(b.data)
1233 1 : }
1234 :
1235 : // Len returns the current size of the batch in bytes.
1236 0 : func (b *Batch) Len() int {
1237 0 : return max(batchrepr.HeaderLen, len(b.data))
1238 0 : }
1239 :
1240 : // Repr returns the underlying batch representation. It is not safe to modify
1241 : // the contents. Reset() will not change the contents of the returned value,
1242 : // though any other mutation operation may do so.
1243 1 : func (b *Batch) Repr() []byte {
1244 1 : if len(b.data) == 0 {
1245 0 : b.init(batchrepr.HeaderLen)
1246 0 : }
1247 1 : batchrepr.SetCount(b.data, b.Count())
1248 1 : return b.data
1249 : }
1250 :
1251 : // SetRepr sets the underlying batch representation. The batch takes ownership
1252 : // of the supplied slice. It is not safe to modify it afterwards until the
1253 : // Batch is no longer in use.
1254 : //
1255 : // SetRepr may return ErrInvalidBatch if the supplied slice fails to decode in
1256 : // any way. It will not return an error in any other circumstance.
1257 1 : func (b *Batch) SetRepr(data []byte) error {
1258 1 : h, ok := batchrepr.ReadHeader(data)
1259 1 : if !ok {
1260 0 : return ErrInvalidBatch
1261 0 : }
1262 1 : b.data = data
1263 1 : b.count = uint64(h.Count)
1264 1 : var err error
1265 1 : if b.db != nil {
1266 1 : // Only track memTableSize for batches that will be committed to the DB.
1267 1 : err = b.refreshMemTableSize()
1268 1 : }
1269 1 : return err
1270 : }
1271 :
1272 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1273 : // return false). The iterator can be positioned via a call to SeekGE,
1274 : // SeekPrefixGE, SeekLT, First or Last. Only indexed batches support iterators.
1275 : //
1276 : // The returned Iterator observes all of the Batch's existing mutations, but no
1277 : // later mutations. Its view can be refreshed via RefreshBatchSnapshot or
1278 : // SetOptions().
1279 1 : func (b *Batch) NewIter(o *IterOptions) (*Iterator, error) {
1280 1 : return b.NewIterWithContext(context.Background(), o)
1281 1 : }
1282 :
1283 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1284 : // tracing.
1285 1 : func (b *Batch) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1286 1 : if b.index == nil {
1287 0 : return nil, ErrNotIndexed
1288 0 : }
1289 1 : return b.db.newIter(ctx, b, newIterOpts{}, o), nil
1290 : }
1291 :
1292 : // NewBatchOnlyIter constructs an iterator that only reads the contents of the
1293 : // batch, and does not overlay the batch mutations on top of the DB state.
1294 : //
1295 : // The returned Iterator observes all of the Batch's existing mutations, but
1296 : // no later mutations. Its view can be refreshed via RefreshBatchSnapshot or
1297 : // SetOptions().
1298 0 : func (b *Batch) NewBatchOnlyIter(ctx context.Context, o *IterOptions) (*Iterator, error) {
1299 0 : if b.index == nil {
1300 0 : return nil, ErrNotIndexed
1301 0 : }
1302 0 : return b.db.newIter(ctx, b, newIterOpts{batch: batchIterOpts{batchOnly: true}}, o), nil
1303 : }
1304 :
1305 : // newInternalIter creates a new internalIterator that iterates over the
1306 : // contents of the batch.
1307 1 : func (b *Batch) newInternalIter(o *IterOptions) *batchIter {
1308 1 : iter := &batchIter{}
1309 1 : b.initInternalIter(o, iter)
1310 1 : return iter
1311 1 : }
1312 :
1313 1 : func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) {
1314 1 : *iter = batchIter{
1315 1 : batch: b,
1316 1 : iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()),
1317 1 : // NB: We explicitly do not propagate the batch snapshot to the point
1318 1 : // key iterator. Filtering point keys within the batch iterator can
1319 1 : // cause pathological behavior where a batch iterator advances
1320 1 : // significantly farther than necessary filtering many batch keys that
1321 1 : // are not visible at the batch sequence number. Instead, the merging
1322 1 : // iterator enforces bounds.
1323 1 : //
1324 1 : // For example, consider an engine that contains the committed keys
1325 1 : // 'bar' and 'bax', with no keys between them. Consider a batch
1326 1 : // containing keys 1,000 keys within the range [a,z]. All of the
1327 1 : // batch keys were added to the batch after the iterator was
1328 1 : // constructed, so they are not visible to the iterator. A call to
1329 1 : // SeekGE('bax') would seek the LSM iterators and discover the key
1330 1 : // 'bax'. It would also seek the batch iterator, landing on the key
1331 1 : // 'baz' but discover it that it's not visible. The batch iterator would
1332 1 : // next through the rest of the batch's keys, only to discover there are
1333 1 : // no visible keys greater than or equal to 'bax'.
1334 1 : //
1335 1 : // Filtering these batch points within the merging iterator ensures that
1336 1 : // the batch iterator never needs to iterate beyond 'baz', because it
1337 1 : // already found a smaller, visible key 'bax'.
1338 1 : snapshot: base.SeqNumMax,
1339 1 : }
1340 1 : }
1341 :
1342 1 : func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
1343 1 : // Construct an iterator even if rangeDelIndex is nil, because it is allowed
1344 1 : // to refresh later, so we need the container to exist.
1345 1 : iter := new(keyspan.Iter)
1346 1 : b.initRangeDelIter(o, iter, batchSnapshot)
1347 1 : return iter
1348 1 : }
1349 :
1350 1 : func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
1351 1 : if b.rangeDelIndex == nil {
1352 1 : iter.Init(b.comparer.Compare, nil)
1353 1 : return
1354 1 : }
1355 :
1356 : // Fragment the range tombstones the first time a range deletion iterator is
1357 : // requested. The cached tombstones are invalidated if another range
1358 : // deletion tombstone is added to the batch. This cache is only guaranteed
1359 : // to be correct if we're opening an iterator to read at a batch sequence
1360 : // number at least as high as tombstonesSeqNum. The cache is guaranteed to
1361 : // include all tombstones up to tombstonesSeqNum, and if any additional
1362 : // tombstones were added after that sequence number the cache would've been
1363 : // cleared.
1364 1 : nextSeqNum := b.nextSeqNum()
1365 1 : if b.tombstones != nil && b.tombstonesSeqNum <= batchSnapshot {
1366 0 : iter.Init(b.comparer.Compare, b.tombstones)
1367 0 : return
1368 0 : }
1369 :
1370 1 : tombstones := make([]keyspan.Span, 0, b.countRangeDels)
1371 1 : frag := &keyspan.Fragmenter{
1372 1 : Cmp: b.comparer.Compare,
1373 1 : Format: b.comparer.FormatKey,
1374 1 : Emit: func(s keyspan.Span) {
1375 1 : tombstones = append(tombstones, s)
1376 1 : },
1377 : }
1378 1 : it := &batchIter{
1379 1 : batch: b,
1380 1 : iter: b.rangeDelIndex.NewIter(nil, nil),
1381 1 : snapshot: batchSnapshot,
1382 1 : }
1383 1 : fragmentRangeDels(frag, it, int(b.countRangeDels))
1384 1 : iter.Init(b.comparer.Compare, tombstones)
1385 1 :
1386 1 : // If we just read all the tombstones in the batch (eg, batchSnapshot was
1387 1 : // set to b.nextSeqNum()), then cache the tombstones so that a subsequent
1388 1 : // call to initRangeDelIter may use them without refragmenting.
1389 1 : if nextSeqNum == batchSnapshot {
1390 0 : b.tombstones = tombstones
1391 0 : b.tombstonesSeqNum = nextSeqNum
1392 0 : }
1393 : }
1394 :
1395 1 : func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int) {
1396 1 : // The memory management here is a bit subtle. The keys and values returned
1397 1 : // by the iterator are slices in Batch.data. Thus the fragmented tombstones
1398 1 : // are slices within Batch.data. If additional entries are added to the
1399 1 : // Batch, Batch.data may be reallocated. The references in the fragmented
1400 1 : // tombstones will remain valid, pointing into the old Batch.data. GC for
1401 1 : // the win.
1402 1 :
1403 1 : // Use a single []keyspan.Key buffer to avoid allocating many
1404 1 : // individual []keyspan.Key slices with a single element each.
1405 1 : keyBuf := make([]keyspan.Key, 0, count)
1406 1 : for kv := it.First(); kv != nil; kv = it.Next() {
1407 1 : s := rangedel.Decode(kv.K, kv.InPlaceValue(), keyBuf)
1408 1 : keyBuf = s.Keys[len(s.Keys):]
1409 1 :
1410 1 : // Set a fixed capacity to avoid accidental overwriting.
1411 1 : s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
1412 1 : frag.Add(s)
1413 1 : }
1414 1 : frag.Finish()
1415 : }
1416 :
1417 1 : func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
1418 1 : // Construct an iterator even if rangeKeyIndex is nil, because it is allowed
1419 1 : // to refresh later, so we need the container to exist.
1420 1 : iter := new(keyspan.Iter)
1421 1 : b.initRangeKeyIter(o, iter, batchSnapshot)
1422 1 : return iter
1423 1 : }
1424 :
1425 1 : func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
1426 1 : if b.rangeKeyIndex == nil {
1427 1 : iter.Init(b.comparer.Compare, nil)
1428 1 : return
1429 1 : }
1430 :
1431 : // Fragment the range keys the first time a range key iterator is requested.
1432 : // The cached spans are invalidated if another range key is added to the
1433 : // batch. This cache is only guaranteed to be correct if we're opening an
1434 : // iterator to read at a batch sequence number at least as high as
1435 : // rangeKeysSeqNum. The cache is guaranteed to include all range keys up to
1436 : // rangeKeysSeqNum, and if any additional range keys were added after that
1437 : // sequence number the cache would've been cleared.
1438 1 : nextSeqNum := b.nextSeqNum()
1439 1 : if b.rangeKeys != nil && b.rangeKeysSeqNum <= batchSnapshot {
1440 1 : iter.Init(b.comparer.Compare, b.rangeKeys)
1441 1 : return
1442 1 : }
1443 :
1444 1 : rangeKeys := make([]keyspan.Span, 0, b.countRangeKeys)
1445 1 : frag := &keyspan.Fragmenter{
1446 1 : Cmp: b.comparer.Compare,
1447 1 : Format: b.comparer.FormatKey,
1448 1 : Emit: func(s keyspan.Span) {
1449 1 : rangeKeys = append(rangeKeys, s)
1450 1 : },
1451 : }
1452 1 : it := &batchIter{
1453 1 : batch: b,
1454 1 : iter: b.rangeKeyIndex.NewIter(nil, nil),
1455 1 : snapshot: batchSnapshot,
1456 1 : }
1457 1 : _ = fragmentRangeKeys(frag, it, int(b.countRangeKeys))
1458 1 : iter.Init(b.comparer.Compare, rangeKeys)
1459 1 :
1460 1 : // If we just read all the range keys in the batch (eg, batchSnapshot was
1461 1 : // set to b.nextSeqNum()), then cache the range keys so that a subsequent
1462 1 : // call to initRangeKeyIter may use them without refragmenting.
1463 1 : if nextSeqNum == batchSnapshot {
1464 1 : b.rangeKeys = rangeKeys
1465 1 : b.rangeKeysSeqNum = nextSeqNum
1466 1 : }
1467 : }
1468 :
1469 1 : func fragmentRangeKeys(frag *keyspan.Fragmenter, it internalIterator, count int) error {
1470 1 : // The memory management here is a bit subtle. The keys and values
1471 1 : // returned by the iterator are slices in Batch.data. Thus the
1472 1 : // fragmented key spans are slices within Batch.data. If additional
1473 1 : // entries are added to the Batch, Batch.data may be reallocated. The
1474 1 : // references in the fragmented keys will remain valid, pointing into
1475 1 : // the old Batch.data. GC for the win.
1476 1 :
1477 1 : // Use a single []keyspan.Key buffer to avoid allocating many
1478 1 : // individual []keyspan.Key slices with a single element each.
1479 1 : keyBuf := make([]keyspan.Key, 0, count)
1480 1 : for kv := it.First(); kv != nil; kv = it.Next() {
1481 1 : s, err := rangekey.Decode(kv.K, kv.InPlaceValue(), keyBuf)
1482 1 : if err != nil {
1483 0 : return err
1484 0 : }
1485 1 : keyBuf = s.Keys[len(s.Keys):]
1486 1 :
1487 1 : // Set a fixed capacity to avoid accidental overwriting.
1488 1 : s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
1489 1 : frag.Add(s)
1490 : }
1491 1 : frag.Finish()
1492 1 : return nil
1493 : }
1494 :
1495 : // Commit applies the batch to its parent writer.
1496 1 : func (b *Batch) Commit(o *WriteOptions) error {
1497 1 : return b.db.Apply(b, o)
1498 1 : }
1499 :
1500 : // Close closes the batch without committing it.
1501 1 : func (b *Batch) Close() error {
1502 1 : // The storage engine commit pipeline may retain a pointer to b.data beyond
1503 1 : // when Commit() returns. This is possible when configured for WAL failover;
1504 1 : // we don't know if we might need to read the batch data again until the
1505 1 : // batch has been durably synced [even if the committer doesn't care to wait
1506 1 : // for the sync and Sync()=false].
1507 1 : //
1508 1 : // We still want to recycle these batches. The b.lifecycle atomic negotiates
1509 1 : // the batch's lifecycle. If the commit pipeline still might read b.data,
1510 1 : // b.lifecycle will be nonzeroed [the low bits hold a ref count].
1511 1 : for {
1512 1 : v := b.lifecycle.Load()
1513 1 : switch {
1514 1 : case v == 0:
1515 1 : // A zero value indicates that the commit pipeline has no
1516 1 : // outstanding references to the batch. The commit pipeline is
1517 1 : // required to acquire a ref synchronously, so there is no risk that
1518 1 : // the commit pipeline will grab a ref after the call to release. We
1519 1 : // can simply release the batch.
1520 1 : b.release()
1521 1 : return nil
1522 0 : case (v & batchClosedBit) != 0:
1523 0 : // The batch has a batchClosedBit: This batch has already been closed.
1524 0 : return ErrClosed
1525 1 : default:
1526 1 : // There's an outstanding reference. Set the batch released bit so
1527 1 : // that the commit pipeline knows it should release the batch when
1528 1 : // it unrefs.
1529 1 : if b.lifecycle.CompareAndSwap(v, v|batchClosedBit) {
1530 1 : return nil
1531 1 : }
1532 : // CAS Failed—this indicates the outstanding reference just
1533 : // decremented (or the caller illegally closed the batch twice).
1534 : // Loop to reload.
1535 : }
1536 : }
1537 : }
1538 :
1539 : // Indexed returns true if the batch is indexed (i.e. supports read
1540 : // operations).
1541 1 : func (b *Batch) Indexed() bool {
1542 1 : return b.index != nil
1543 1 : }
1544 :
1545 : // init ensures that the batch data slice is initialized to meet the
1546 : // minimum required size and allocates space for the batch header.
1547 1 : func (b *Batch) init(size int) {
1548 1 : b.opts.ensureDefaults()
1549 1 : n := b.opts.initialSizeBytes
1550 1 : for n < size {
1551 0 : n *= 2
1552 0 : }
1553 1 : if cap(b.data) < n {
1554 1 : b.data = rawalloc.New(batchrepr.HeaderLen, n)
1555 1 : }
1556 1 : b.data = b.data[:batchrepr.HeaderLen]
1557 1 : clear(b.data) // Zero the sequence number in the header
1558 : }
1559 :
1560 : // Reset resets the batch for reuse. The underlying byte slice (that is
1561 : // returned by Repr()) may not be modified. It is only necessary to call this
1562 : // method if a batch is explicitly being reused. Close automatically takes are
1563 : // of releasing resources when appropriate for batches that are internally
1564 : // being reused.
1565 0 : func (b *Batch) Reset() {
1566 0 : // In some configurations (WAL failover) the commit pipeline may retain
1567 0 : // b.data beyond a call to commit the batch. When this happens, b.lifecycle
1568 0 : // is nonzero (see the comment above b.lifecycle). In this case it's unsafe
1569 0 : // to mutate b.data, so we discard it. Note that Reset must not be called on
1570 0 : // a closed batch, so v > 0 implies a non-zero ref count and not
1571 0 : // batchClosedBit being set.
1572 0 : if v := b.lifecycle.Load(); v > 0 {
1573 0 : b.data = nil
1574 0 : }
1575 0 : b.reset()
1576 : }
1577 :
1578 1 : func (b *Batch) reset() {
1579 1 : // Zero out the struct, retaining only the fields necessary for manual
1580 1 : // reuse.
1581 1 : b.batchInternal = batchInternal{
1582 1 : data: b.data,
1583 1 : comparer: b.comparer,
1584 1 : opts: b.opts,
1585 1 : index: b.index,
1586 1 : db: b.db,
1587 1 : }
1588 1 : b.applied.Store(false)
1589 1 : if b.data != nil {
1590 1 : if cap(b.data) > b.opts.maxRetainedSizeBytes {
1591 0 : // If the capacity of the buffer is larger than our maximum
1592 0 : // retention size, don't re-use it. Let it be GC-ed instead.
1593 0 : // This prevents the memory from an unusually large batch from
1594 0 : // being held on to indefinitely.
1595 0 : b.data = nil
1596 1 : } else {
1597 1 : // Otherwise, reset the buffer for re-use.
1598 1 : b.data = b.data[:batchrepr.HeaderLen]
1599 1 : clear(b.data)
1600 1 : }
1601 : }
1602 1 : if b.index != nil {
1603 1 : b.index.Init(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
1604 1 : }
1605 : }
1606 :
1607 1 : func (b *Batch) grow(n int) {
1608 1 : newSize := len(b.data) + n
1609 1 : if uint64(newSize) >= maxBatchSize {
1610 0 : panic(ErrBatchTooLarge)
1611 : }
1612 1 : if newSize > cap(b.data) {
1613 0 : newCap := 2 * cap(b.data)
1614 0 : for newCap < newSize {
1615 0 : newCap *= 2
1616 0 : }
1617 0 : newData := rawalloc.New(len(b.data), newCap)
1618 0 : copy(newData, b.data)
1619 0 : b.data = newData
1620 : }
1621 1 : b.data = b.data[:newSize]
1622 : }
1623 :
1624 1 : func (b *Batch) setSeqNum(seqNum base.SeqNum) {
1625 1 : batchrepr.SetSeqNum(b.data, seqNum)
1626 1 : }
1627 :
1628 : // SeqNum returns the batch sequence number which is applied to the first
1629 : // record in the batch. The sequence number is incremented for each subsequent
1630 : // record. It returns zero if the batch is empty.
1631 1 : func (b *Batch) SeqNum() base.SeqNum {
1632 1 : if len(b.data) == 0 {
1633 0 : b.init(batchrepr.HeaderLen)
1634 0 : }
1635 1 : return batchrepr.ReadSeqNum(b.data)
1636 : }
1637 :
1638 1 : func (b *Batch) setCount(v uint32) {
1639 1 : b.count = uint64(v)
1640 1 : }
1641 :
1642 : // Count returns the count of memtable-modifying operations in this batch. All
1643 : // operations with the except of LogData increment this count. For IngestSSTs,
1644 : // count is only used to indicate the number of SSTs ingested in the record, the
1645 : // batch isn't applied to the memtable.
1646 1 : func (b *Batch) Count() uint32 {
1647 1 : if b.count > math.MaxUint32 {
1648 0 : panic(batchrepr.ErrInvalidBatch)
1649 : }
1650 1 : return uint32(b.count)
1651 : }
1652 :
1653 : // Reader returns a batchrepr.Reader for the current batch contents. If the
1654 : // batch is mutated, the new entries will not be visible to the reader.
1655 1 : func (b *Batch) Reader() batchrepr.Reader {
1656 1 : if len(b.data) == 0 {
1657 1 : b.init(batchrepr.HeaderLen)
1658 1 : }
1659 1 : return batchrepr.Read(b.data)
1660 : }
1661 :
1662 : // SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
1663 1 : func (b *Batch) SyncWait() error {
1664 1 : now := crtime.NowMono()
1665 1 : b.fsyncWait.Wait()
1666 1 : if b.commitErr != nil {
1667 0 : b.db = nil // prevent batch reuse on error
1668 0 : }
1669 1 : waitDuration := now.Elapsed()
1670 1 : b.commitStats.CommitWaitDuration += waitDuration
1671 1 : b.commitStats.TotalDuration += waitDuration
1672 1 : return b.commitErr
1673 : }
1674 :
1675 : // CommitStats returns stats related to committing the batch. Should be called
1676 : // after Batch.Commit, DB.Apply. If DB.ApplyNoSyncWait is used, should be
1677 : // called after Batch.SyncWait.
1678 0 : func (b *Batch) CommitStats() BatchCommitStats {
1679 0 : return b.commitStats
1680 0 : }
1681 :
1682 : // Note: batchIter mirrors the implementation of flushableBatchIter. Keep the
1683 : // two in sync.
1684 : type batchIter struct {
1685 : batch *Batch
1686 : iter batchskl.Iterator
1687 : kv base.InternalKV
1688 : err error
1689 : // snapshot holds a batch "sequence number" at which the batch is being
1690 : // read. This sequence number has the InternalKeySeqNumBatch bit set, so it
1691 : // encodes an offset within the batch. Only batch entries earlier than the
1692 : // offset are visible during iteration.
1693 : snapshot base.SeqNum
1694 : }
1695 :
1696 : // batchIter implements the base.InternalIterator interface.
1697 : var _ base.InternalIterator = (*batchIter)(nil)
1698 :
1699 0 : func (i *batchIter) String() string {
1700 0 : return "batch"
1701 0 : }
1702 :
1703 1 : func (i *batchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
1704 1 : // Ignore TrySeekUsingNext if the view of the batch changed.
1705 1 : if flags.TrySeekUsingNext() && flags.BatchJustRefreshed() {
1706 0 : flags = flags.DisableTrySeekUsingNext()
1707 0 : }
1708 :
1709 1 : i.err = nil // clear cached iteration error
1710 1 : ikey := i.iter.SeekGE(key, flags)
1711 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1712 0 : ikey = i.iter.Next()
1713 0 : }
1714 1 : if ikey == nil {
1715 1 : i.kv = base.InternalKV{}
1716 1 : return nil
1717 1 : }
1718 1 : i.kv.K = *ikey
1719 1 : i.kv.V = base.MakeInPlaceValue(i.value())
1720 1 : return &i.kv
1721 : }
1722 :
1723 1 : func (i *batchIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
1724 1 : kv := i.SeekGE(key, flags)
1725 1 : if kv == nil {
1726 1 : return nil
1727 1 : }
1728 : // If the key doesn't have the sought prefix, return nil.
1729 1 : if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
1730 1 : i.kv = base.InternalKV{}
1731 1 : return nil
1732 1 : }
1733 1 : return kv
1734 : }
1735 :
1736 1 : func (i *batchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
1737 1 : i.err = nil // clear cached iteration error
1738 1 : ikey := i.iter.SeekLT(key)
1739 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1740 0 : ikey = i.iter.Prev()
1741 0 : }
1742 1 : if ikey == nil {
1743 1 : i.kv = base.InternalKV{}
1744 1 : return nil
1745 1 : }
1746 1 : i.kv.K = *ikey
1747 1 : i.kv.V = base.MakeInPlaceValue(i.value())
1748 1 : return &i.kv
1749 : }
1750 :
1751 1 : func (i *batchIter) First() *base.InternalKV {
1752 1 : i.err = nil // clear cached iteration error
1753 1 : ikey := i.iter.First()
1754 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1755 1 : ikey = i.iter.Next()
1756 1 : }
1757 1 : if ikey == nil {
1758 1 : i.kv = base.InternalKV{}
1759 1 : return nil
1760 1 : }
1761 1 : i.kv.K = *ikey
1762 1 : i.kv.V = base.MakeInPlaceValue(i.value())
1763 1 : return &i.kv
1764 : }
1765 :
1766 1 : func (i *batchIter) Last() *base.InternalKV {
1767 1 : i.err = nil // clear cached iteration error
1768 1 : ikey := i.iter.Last()
1769 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1770 0 : ikey = i.iter.Prev()
1771 0 : }
1772 1 : if ikey == nil {
1773 1 : i.kv = base.InternalKV{}
1774 1 : return nil
1775 1 : }
1776 1 : i.kv.K = *ikey
1777 1 : i.kv.V = base.MakeInPlaceValue(i.value())
1778 1 : return &i.kv
1779 : }
1780 :
1781 1 : func (i *batchIter) Next() *base.InternalKV {
1782 1 : ikey := i.iter.Next()
1783 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1784 0 : ikey = i.iter.Next()
1785 0 : }
1786 1 : if ikey == nil {
1787 1 : i.kv = base.InternalKV{}
1788 1 : return nil
1789 1 : }
1790 1 : i.kv.K = *ikey
1791 1 : i.kv.V = base.MakeInPlaceValue(i.value())
1792 1 : return &i.kv
1793 : }
1794 :
1795 0 : func (i *batchIter) NextPrefix(succKey []byte) *base.InternalKV {
1796 0 : // Because NextPrefix was invoked `succKey` must be ≥ the key at i's current
1797 0 : // position. Seek the arena iterator using TrySeekUsingNext.
1798 0 : ikey := i.iter.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
1799 0 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1800 0 : ikey = i.iter.Next()
1801 0 : }
1802 0 : if ikey == nil {
1803 0 : i.kv = base.InternalKV{}
1804 0 : return nil
1805 0 : }
1806 0 : i.kv.K = *ikey
1807 0 : i.kv.V = base.MakeInPlaceValue(i.value())
1808 0 : return &i.kv
1809 : }
1810 :
1811 1 : func (i *batchIter) Prev() *base.InternalKV {
1812 1 : ikey := i.iter.Prev()
1813 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1814 0 : ikey = i.iter.Prev()
1815 0 : }
1816 1 : if ikey == nil {
1817 1 : i.kv = base.InternalKV{}
1818 1 : return nil
1819 1 : }
1820 0 : i.kv.K = *ikey
1821 0 : i.kv.V = base.MakeInPlaceValue(i.value())
1822 0 : return &i.kv
1823 : }
1824 :
1825 1 : func (i *batchIter) value() []byte {
1826 1 : offset, _, keyEnd := i.iter.KeyInfo()
1827 1 : data := i.batch.data
1828 1 : if len(data[offset:]) == 0 {
1829 0 : i.err = base.CorruptionErrorf("corrupted batch")
1830 0 : return nil
1831 0 : }
1832 :
1833 1 : switch InternalKeyKind(data[offset]) {
1834 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
1835 : InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
1836 1 : InternalKeyKindDeleteSized:
1837 1 : _, value, ok := batchrepr.DecodeStr(data[keyEnd:])
1838 1 : if !ok {
1839 0 : return nil
1840 0 : }
1841 1 : return value
1842 1 : default:
1843 1 : return nil
1844 : }
1845 : }
1846 :
1847 1 : func (i *batchIter) Error() error {
1848 1 : return i.err
1849 1 : }
1850 :
1851 1 : func (i *batchIter) Close() error {
1852 1 : _ = i.iter.Close()
1853 1 : return i.err
1854 1 : }
1855 :
1856 1 : func (i *batchIter) SetBounds(lower, upper []byte) {
1857 1 : i.iter.SetBounds(lower, upper)
1858 1 : }
1859 :
1860 0 : func (i *batchIter) SetContext(_ context.Context) {}
1861 :
1862 : // DebugTree is part of the InternalIterator interface.
1863 0 : func (i *batchIter) DebugTree(tp treeprinter.Node) {
1864 0 : tp.Childf("%T(%p)", i, i)
1865 0 : }
1866 :
1867 : type flushableBatchEntry struct {
1868 : // offset is the byte offset of the record within the batch repr.
1869 : offset uint32
1870 : // index is the 0-based ordinal number of the record within the batch. Used
1871 : // to compute the seqnum for the record.
1872 : index uint32
1873 : // key{Start,End} are the start and end byte offsets of the key within the
1874 : // batch repr. Cached to avoid decoding the key length on every
1875 : // comparison. The value is stored starting at keyEnd.
1876 : keyStart uint32
1877 : keyEnd uint32
1878 : }
1879 :
1880 : // flushableBatch wraps an existing batch and provides the interfaces needed
1881 : // for making the batch flushable (i.e. able to mimic a memtable).
1882 : type flushableBatch struct {
1883 : cmp Compare
1884 : comparer *base.Comparer
1885 : data []byte
1886 :
1887 : // The base sequence number for the entries in the batch. This is the same
1888 : // value as Batch.seqNum() and is cached here for performance.
1889 : seqNum base.SeqNum
1890 :
1891 : // A slice of offsets and indices for the entries in the batch. Used to
1892 : // implement flushableBatchIter. Unlike the indexing on a normal batch, a
1893 : // flushable batch is indexed such that batch entry i will be given the
1894 : // sequence number flushableBatch.seqNum+i.
1895 : //
1896 : // Sorted in increasing order of key and decreasing order of offset (since
1897 : // higher offsets correspond to higher sequence numbers).
1898 : //
1899 : // Does not include range deletion entries or range key entries.
1900 : offsets []flushableBatchEntry
1901 :
1902 : // Fragmented range deletion tombstones.
1903 : tombstones []keyspan.Span
1904 :
1905 : // Fragmented range keys.
1906 : rangeKeys []keyspan.Span
1907 : }
1908 :
1909 : var _ flushable = (*flushableBatch)(nil)
1910 :
1911 : // newFlushableBatch creates a new batch that implements the flushable
1912 : // interface. This allows the batch to act like a memtable and be placed in the
1913 : // queue of flushable memtables. Note that the flushable batch takes ownership
1914 : // of the batch data.
1915 1 : func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error) {
1916 1 : b := &flushableBatch{
1917 1 : data: batch.data,
1918 1 : cmp: comparer.Compare,
1919 1 : comparer: comparer,
1920 1 : offsets: make([]flushableBatchEntry, 0, batch.Count()),
1921 1 : }
1922 1 : if b.data != nil {
1923 1 : // Note that this sequence number is not correct when this batch has not
1924 1 : // been applied since the sequence number has not been assigned yet. The
1925 1 : // correct sequence number will be set later. But it is correct when the
1926 1 : // batch is being replayed from the WAL.
1927 1 : b.seqNum = batch.SeqNum()
1928 1 : }
1929 1 : var rangeDelOffsets []flushableBatchEntry
1930 1 : var rangeKeyOffsets []flushableBatchEntry
1931 1 : if len(b.data) > batchrepr.HeaderLen {
1932 1 : // Non-empty batch.
1933 1 : var index uint32
1934 1 : for iter := batchrepr.Read(b.data); len(iter) > 0; {
1935 1 : offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
1936 1 : kind, key, _, ok, err := iter.Next()
1937 1 : if !ok {
1938 0 : if err != nil {
1939 0 : return nil, err
1940 0 : }
1941 0 : break
1942 : }
1943 1 : entry := flushableBatchEntry{
1944 1 : offset: uint32(offset),
1945 1 : index: uint32(index),
1946 1 : }
1947 1 : if keySize := uint32(len(key)); keySize == 0 {
1948 1 : // Must add 2 to the offset. One byte encodes `kind` and the next
1949 1 : // byte encodes `0`, which is the length of the key.
1950 1 : entry.keyStart = uint32(offset) + 2
1951 1 : entry.keyEnd = entry.keyStart
1952 1 : } else {
1953 1 : entry.keyStart = uint32(uintptr(unsafe.Pointer(&key[0])) -
1954 1 : uintptr(unsafe.Pointer(&b.data[0])))
1955 1 : entry.keyEnd = entry.keyStart + keySize
1956 1 : }
1957 1 : switch kind {
1958 1 : case InternalKeyKindRangeDelete:
1959 1 : rangeDelOffsets = append(rangeDelOffsets, entry)
1960 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
1961 1 : rangeKeyOffsets = append(rangeKeyOffsets, entry)
1962 1 : case InternalKeyKindLogData:
1963 1 : // Skip it; we never want to iterate over LogDatas.
1964 1 : continue
1965 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindMerge,
1966 1 : InternalKeyKindSingleDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
1967 1 : b.offsets = append(b.offsets, entry)
1968 0 : default:
1969 0 : // Note In some circumstances this might be temporary memory
1970 0 : // corruption that can be recovered by discarding the batch and
1971 0 : // trying again. In other cases, the batch repr might've been
1972 0 : // already persisted elsewhere, and we'll loop continuously trying
1973 0 : // to commit the same corrupted batch. The caller is responsible for
1974 0 : // distinguishing.
1975 0 : return nil, errors.Wrapf(ErrInvalidBatch, "unrecognized kind %v", kind)
1976 : }
1977 : // NB: index (used for entry.offset above) must not reach the
1978 : // batch.count, because the offset is used in conjunction with the
1979 : // batch's sequence number to assign sequence numbers to keys within
1980 : // the batch. If we assign KV's indexes as high as batch.count,
1981 : // we'll begin assigning keys sequence numbers that weren't
1982 : // allocated.
1983 1 : if index >= uint32(batch.count) {
1984 0 : return nil, base.AssertionFailedf("pebble: batch entry index %d ≥ batch.count %d", index, batch.count)
1985 0 : }
1986 1 : index++
1987 : }
1988 : }
1989 :
1990 : // Sort all of offsets, rangeDelOffsets and rangeKeyOffsets, using *batch's
1991 : // sort.Interface implementation.
1992 1 : pointOffsets := b.offsets
1993 1 : sort.Sort(b)
1994 1 : b.offsets = rangeDelOffsets
1995 1 : sort.Sort(b)
1996 1 : b.offsets = rangeKeyOffsets
1997 1 : sort.Sort(b)
1998 1 : b.offsets = pointOffsets
1999 1 :
2000 1 : if len(rangeDelOffsets) > 0 {
2001 1 : frag := &keyspan.Fragmenter{
2002 1 : Cmp: b.cmp,
2003 1 : Format: b.comparer.FormatKey,
2004 1 : Emit: func(s keyspan.Span) {
2005 1 : b.tombstones = append(b.tombstones, s)
2006 1 : },
2007 : }
2008 1 : it := &flushableBatchIter{
2009 1 : batch: b,
2010 1 : data: b.data,
2011 1 : offsets: rangeDelOffsets,
2012 1 : cmp: b.cmp,
2013 1 : index: -1,
2014 1 : }
2015 1 : fragmentRangeDels(frag, it, len(rangeDelOffsets))
2016 : }
2017 1 : if len(rangeKeyOffsets) > 0 {
2018 1 : frag := &keyspan.Fragmenter{
2019 1 : Cmp: b.cmp,
2020 1 : Format: b.comparer.FormatKey,
2021 1 : Emit: func(s keyspan.Span) {
2022 1 : b.rangeKeys = append(b.rangeKeys, s)
2023 1 : },
2024 : }
2025 1 : it := &flushableBatchIter{
2026 1 : batch: b,
2027 1 : data: b.data,
2028 1 : offsets: rangeKeyOffsets,
2029 1 : cmp: b.cmp,
2030 1 : index: -1,
2031 1 : }
2032 1 : if err := fragmentRangeKeys(frag, it, len(rangeKeyOffsets)); err != nil {
2033 0 : return nil, err
2034 0 : }
2035 : }
2036 1 : return b, nil
2037 : }
2038 :
2039 1 : func (b *flushableBatch) setSeqNum(seqNum base.SeqNum) {
2040 1 : if b.seqNum != 0 {
2041 0 : panic(fmt.Sprintf("pebble: flushableBatch.seqNum already set: %d", b.seqNum))
2042 : }
2043 1 : b.seqNum = seqNum
2044 1 : for i := range b.tombstones {
2045 1 : for j := range b.tombstones[i].Keys {
2046 1 : b.tombstones[i].Keys[j].Trailer = base.MakeTrailer(
2047 1 : b.tombstones[i].Keys[j].SeqNum()+seqNum,
2048 1 : b.tombstones[i].Keys[j].Kind(),
2049 1 : )
2050 1 : }
2051 : }
2052 1 : for i := range b.rangeKeys {
2053 1 : for j := range b.rangeKeys[i].Keys {
2054 1 : b.rangeKeys[i].Keys[j].Trailer = base.MakeTrailer(
2055 1 : b.rangeKeys[i].Keys[j].SeqNum()+seqNum,
2056 1 : b.rangeKeys[i].Keys[j].Kind(),
2057 1 : )
2058 1 : }
2059 : }
2060 : }
2061 :
2062 1 : func (b *flushableBatch) Len() int {
2063 1 : return len(b.offsets)
2064 1 : }
2065 :
2066 1 : func (b *flushableBatch) Less(i, j int) bool {
2067 1 : ei := &b.offsets[i]
2068 1 : ej := &b.offsets[j]
2069 1 : ki := b.data[ei.keyStart:ei.keyEnd]
2070 1 : kj := b.data[ej.keyStart:ej.keyEnd]
2071 1 : switch c := b.cmp(ki, kj); {
2072 1 : case c < 0:
2073 1 : return true
2074 1 : case c > 0:
2075 1 : return false
2076 1 : default:
2077 1 : return ei.offset > ej.offset
2078 : }
2079 : }
2080 :
2081 1 : func (b *flushableBatch) Swap(i, j int) {
2082 1 : b.offsets[i], b.offsets[j] = b.offsets[j], b.offsets[i]
2083 1 : }
2084 :
2085 : // newIter is part of the flushable interface.
2086 1 : func (b *flushableBatch) newIter(o *IterOptions) internalIterator {
2087 1 : return &flushableBatchIter{
2088 1 : batch: b,
2089 1 : data: b.data,
2090 1 : offsets: b.offsets,
2091 1 : cmp: b.cmp,
2092 1 : index: -1,
2093 1 : lower: o.GetLowerBound(),
2094 1 : upper: o.GetUpperBound(),
2095 1 : }
2096 1 : }
2097 :
2098 : // newFlushIter is part of the flushable interface.
2099 1 : func (b *flushableBatch) newFlushIter(o *IterOptions) internalIterator {
2100 1 : return &flushFlushableBatchIter{
2101 1 : flushableBatchIter: flushableBatchIter{
2102 1 : batch: b,
2103 1 : data: b.data,
2104 1 : offsets: b.offsets,
2105 1 : cmp: b.cmp,
2106 1 : index: -1,
2107 1 : },
2108 1 : }
2109 1 : }
2110 :
2111 : // newRangeDelIter is part of the flushable interface.
2112 1 : func (b *flushableBatch) newRangeDelIter(o *IterOptions) keyspan.FragmentIterator {
2113 1 : if len(b.tombstones) == 0 {
2114 1 : return nil
2115 1 : }
2116 1 : return keyspan.NewIter(b.cmp, b.tombstones)
2117 : }
2118 :
2119 : // newRangeKeyIter is part of the flushable interface.
2120 1 : func (b *flushableBatch) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
2121 1 : if len(b.rangeKeys) == 0 {
2122 1 : return nil
2123 1 : }
2124 1 : return keyspan.NewIter(b.cmp, b.rangeKeys)
2125 : }
2126 :
2127 : // containsRangeKeys is part of the flushable interface.
2128 1 : func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0 }
2129 :
2130 : // inuseBytes is part of the flushable interface.
2131 1 : func (b *flushableBatch) inuseBytes() uint64 {
2132 1 : return uint64(len(b.data) - batchrepr.HeaderLen)
2133 1 : }
2134 :
2135 : // totalBytes is part of the flushable interface.
2136 1 : func (b *flushableBatch) totalBytes() uint64 {
2137 1 : return uint64(cap(b.data))
2138 1 : }
2139 :
2140 : // readyForFlush is part of the flushable interface.
2141 1 : func (b *flushableBatch) readyForFlush() bool {
2142 1 : // A flushable batch is always ready for flush; it must be flushed together
2143 1 : // with the previous memtable.
2144 1 : return true
2145 1 : }
2146 :
2147 : // computePossibleOverlaps is part of the flushable interface.
2148 : func (b *flushableBatch) computePossibleOverlaps(
2149 : fn func(bounded) shouldContinue, bounded ...bounded,
2150 1 : ) {
2151 1 : computePossibleOverlapsGenericImpl[*flushableBatch](b, b.cmp, fn, bounded)
2152 1 : }
2153 :
2154 : // Note: flushableBatchIter mirrors the implementation of batchIter. Keep the
2155 : // two in sync.
2156 : type flushableBatchIter struct {
2157 : // Members to be initialized by creator.
2158 : batch *flushableBatch
2159 : // The bytes backing the batch. Always the same as batch.data?
2160 : data []byte
2161 : // The sorted entries. This is not always equal to batch.offsets.
2162 : offsets []flushableBatchEntry
2163 : cmp Compare
2164 : // Must be initialized to -1. It is the index into offsets that represents
2165 : // the current iterator position.
2166 : index int
2167 :
2168 : // For internal use by the implementation.
2169 : kv base.InternalKV
2170 : err error
2171 :
2172 : // Optionally initialize to bounds of iteration, if any.
2173 : lower []byte
2174 : upper []byte
2175 : }
2176 :
2177 : // flushableBatchIter implements the base.InternalIterator interface.
2178 : var _ base.InternalIterator = (*flushableBatchIter)(nil)
2179 :
2180 1 : func (i *flushableBatchIter) String() string {
2181 1 : return "flushable-batch"
2182 1 : }
2183 :
2184 : // SeekGE implements internalIterator.SeekGE, as documented in the pebble
2185 : // package. Ignore flags.TrySeekUsingNext() since we don't expect this
2186 : // optimization to provide much benefit here at the moment.
2187 1 : func (i *flushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
2188 1 : i.err = nil // clear cached iteration error
2189 1 : ikey := base.MakeSearchKey(key)
2190 1 : i.index = sort.Search(len(i.offsets), func(j int) bool {
2191 1 : return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
2192 1 : })
2193 1 : if i.index >= len(i.offsets) {
2194 1 : return nil
2195 1 : }
2196 1 : kv := i.getKV(i.index)
2197 1 : if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
2198 1 : i.index = len(i.offsets)
2199 1 : return nil
2200 1 : }
2201 1 : return kv
2202 : }
2203 :
2204 : // SeekPrefixGE implements internalIterator.SeekPrefixGE, as documented in the
2205 : // pebble package.
2206 : func (i *flushableBatchIter) SeekPrefixGE(
2207 : prefix, key []byte, flags base.SeekGEFlags,
2208 1 : ) *base.InternalKV {
2209 1 : kv := i.SeekGE(key, flags)
2210 1 : if kv == nil {
2211 1 : return nil
2212 1 : }
2213 : // If the key doesn't have the sought prefix, return nil.
2214 1 : if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) {
2215 1 : return nil
2216 1 : }
2217 1 : return kv
2218 : }
2219 :
2220 : // SeekLT implements internalIterator.SeekLT, as documented in the pebble
2221 : // package.
2222 1 : func (i *flushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
2223 1 : i.err = nil // clear cached iteration error
2224 1 : ikey := base.MakeSearchKey(key)
2225 1 : i.index = sort.Search(len(i.offsets), func(j int) bool {
2226 1 : return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
2227 1 : })
2228 1 : i.index--
2229 1 : if i.index < 0 {
2230 1 : return nil
2231 1 : }
2232 1 : kv := i.getKV(i.index)
2233 1 : if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
2234 1 : i.index = -1
2235 1 : return nil
2236 1 : }
2237 1 : return kv
2238 : }
2239 :
2240 : // First implements internalIterator.First, as documented in the pebble
2241 : // package.
2242 1 : func (i *flushableBatchIter) First() *base.InternalKV {
2243 1 : i.err = nil // clear cached iteration error
2244 1 : if len(i.offsets) == 0 {
2245 1 : return nil
2246 1 : }
2247 1 : i.index = 0
2248 1 : kv := i.getKV(i.index)
2249 1 : if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
2250 1 : i.index = len(i.offsets)
2251 1 : return nil
2252 1 : }
2253 1 : return kv
2254 : }
2255 :
2256 : // Last implements internalIterator.Last, as documented in the pebble
2257 : // package.
2258 1 : func (i *flushableBatchIter) Last() *base.InternalKV {
2259 1 : i.err = nil // clear cached iteration error
2260 1 : if len(i.offsets) == 0 {
2261 1 : return nil
2262 1 : }
2263 1 : i.index = len(i.offsets) - 1
2264 1 : kv := i.getKV(i.index)
2265 1 : if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
2266 0 : i.index = -1
2267 0 : return nil
2268 0 : }
2269 1 : return kv
2270 : }
2271 :
2272 : // Note: flushFlushableBatchIter.Next mirrors the implementation of
2273 : // flushableBatchIter.Next due to performance. Keep the two in sync.
2274 1 : func (i *flushableBatchIter) Next() *base.InternalKV {
2275 1 : if i.index == len(i.offsets) {
2276 0 : return nil
2277 0 : }
2278 1 : i.index++
2279 1 : if i.index == len(i.offsets) {
2280 1 : return nil
2281 1 : }
2282 1 : kv := i.getKV(i.index)
2283 1 : if i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0 {
2284 1 : i.index = len(i.offsets)
2285 1 : return nil
2286 1 : }
2287 1 : return kv
2288 : }
2289 :
2290 1 : func (i *flushableBatchIter) Prev() *base.InternalKV {
2291 1 : if i.index < 0 {
2292 0 : return nil
2293 0 : }
2294 1 : i.index--
2295 1 : if i.index < 0 {
2296 1 : return nil
2297 1 : }
2298 1 : kv := i.getKV(i.index)
2299 1 : if i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0 {
2300 1 : i.index = -1
2301 1 : return nil
2302 1 : }
2303 1 : return kv
2304 : }
2305 :
2306 : // Note: flushFlushableBatchIter.NextPrefix mirrors the implementation of
2307 : // flushableBatchIter.NextPrefix due to performance. Keep the two in sync.
2308 0 : func (i *flushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV {
2309 0 : return i.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
2310 0 : }
2311 :
2312 1 : func (i *flushableBatchIter) getKey(index int) InternalKey {
2313 1 : e := &i.offsets[index]
2314 1 : kind := InternalKeyKind(i.data[e.offset])
2315 1 : key := i.data[e.keyStart:e.keyEnd]
2316 1 : return base.MakeInternalKey(key, i.batch.seqNum+base.SeqNum(e.index), kind)
2317 1 : }
2318 :
2319 1 : func (i *flushableBatchIter) getKV(index int) *base.InternalKV {
2320 1 : i.kv = base.InternalKV{
2321 1 : K: i.getKey(index),
2322 1 : V: base.MakeInPlaceValue(i.extractValue()),
2323 1 : }
2324 1 : return &i.kv
2325 1 : }
2326 :
2327 1 : func (i *flushableBatchIter) extractValue() []byte {
2328 1 : p := i.data[i.offsets[i.index].offset:]
2329 1 : if len(p) == 0 {
2330 0 : i.err = base.CorruptionErrorf("corrupted batch")
2331 0 : return nil
2332 0 : }
2333 1 : kind := InternalKeyKind(p[0])
2334 1 : if kind > InternalKeyKindMax {
2335 0 : i.err = base.CorruptionErrorf("corrupted batch")
2336 0 : return nil
2337 0 : }
2338 1 : var value []byte
2339 1 : var ok bool
2340 1 : switch kind {
2341 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
2342 : InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
2343 1 : InternalKeyKindDeleteSized:
2344 1 : keyEnd := i.offsets[i.index].keyEnd
2345 1 : _, value, ok = batchrepr.DecodeStr(i.data[keyEnd:])
2346 1 : if !ok {
2347 0 : i.err = base.CorruptionErrorf("corrupted batch")
2348 0 : return nil
2349 0 : }
2350 : }
2351 1 : return value
2352 : }
2353 :
2354 0 : func (i *flushableBatchIter) Valid() bool {
2355 0 : return i.index >= 0 && i.index < len(i.offsets)
2356 0 : }
2357 :
2358 1 : func (i *flushableBatchIter) Error() error {
2359 1 : return i.err
2360 1 : }
2361 :
2362 1 : func (i *flushableBatchIter) Close() error {
2363 1 : return i.err
2364 1 : }
2365 :
2366 1 : func (i *flushableBatchIter) SetBounds(lower, upper []byte) {
2367 1 : i.lower = lower
2368 1 : i.upper = upper
2369 1 : }
2370 :
2371 0 : func (i *flushableBatchIter) SetContext(_ context.Context) {}
2372 :
2373 : // DebugTree is part of the InternalIterator interface.
2374 0 : func (i *flushableBatchIter) DebugTree(tp treeprinter.Node) {
2375 0 : tp.Childf("%T(%p)", i, i)
2376 0 : }
2377 :
2378 : // flushFlushableBatchIter is similar to flushableBatchIter but it keeps track
2379 : // of number of bytes iterated.
2380 : type flushFlushableBatchIter struct {
2381 : flushableBatchIter
2382 : }
2383 :
2384 : // flushFlushableBatchIter implements the base.InternalIterator interface.
2385 : var _ base.InternalIterator = (*flushFlushableBatchIter)(nil)
2386 :
2387 0 : func (i *flushFlushableBatchIter) String() string {
2388 0 : return "flushable-batch"
2389 0 : }
2390 :
2391 0 : func (i *flushFlushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
2392 0 : panic("pebble: SeekGE unimplemented")
2393 : }
2394 :
2395 : func (i *flushFlushableBatchIter) SeekPrefixGE(
2396 : prefix, key []byte, flags base.SeekGEFlags,
2397 0 : ) *base.InternalKV {
2398 0 : panic("pebble: SeekPrefixGE unimplemented")
2399 : }
2400 :
2401 0 : func (i *flushFlushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
2402 0 : panic("pebble: SeekLT unimplemented")
2403 : }
2404 :
2405 1 : func (i *flushFlushableBatchIter) First() *base.InternalKV {
2406 1 : i.err = nil // clear cached iteration error
2407 1 : return i.flushableBatchIter.First()
2408 1 : }
2409 :
2410 0 : func (i *flushFlushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV {
2411 0 : panic("pebble: Prev unimplemented")
2412 : }
2413 :
2414 : // Note: flushFlushableBatchIter.Next mirrors the implementation of
2415 : // flushableBatchIter.Next due to performance. Keep the two in sync.
2416 1 : func (i *flushFlushableBatchIter) Next() *base.InternalKV {
2417 1 : if i.index == len(i.offsets) {
2418 0 : return nil
2419 0 : }
2420 1 : i.index++
2421 1 : if i.index == len(i.offsets) {
2422 1 : return nil
2423 1 : }
2424 1 : return i.getKV(i.index)
2425 : }
2426 :
2427 0 : func (i flushFlushableBatchIter) Prev() *base.InternalKV {
2428 0 : panic("pebble: Prev unimplemented")
2429 : }
2430 :
2431 : // batchOptions holds the parameters to configure batch.
2432 : type batchOptions struct {
2433 : initialSizeBytes int
2434 : maxRetainedSizeBytes int
2435 : }
2436 :
2437 : // ensureDefaults creates batch options with default values.
2438 1 : func (o *batchOptions) ensureDefaults() {
2439 1 : if o.initialSizeBytes <= 0 {
2440 1 : o.initialSizeBytes = defaultBatchInitialSize
2441 1 : }
2442 1 : if o.maxRetainedSizeBytes <= 0 {
2443 1 : o.maxRetainedSizeBytes = defaultBatchMaxRetainedSize
2444 1 : }
2445 : }
2446 :
2447 : // BatchOption allows customizing the batch.
2448 : type BatchOption func(*batchOptions)
2449 :
2450 : // WithInitialSizeBytes sets a custom initial size for the batch. Defaults
2451 : // to 1KB.
2452 0 : func WithInitialSizeBytes(s int) BatchOption {
2453 0 : return func(opts *batchOptions) {
2454 0 : opts.initialSizeBytes = s
2455 0 : }
2456 : }
2457 :
2458 : // WithMaxRetainedSizeBytes sets a custom max size for the batch to be
2459 : // re-used. Any batch which exceeds the max retained size would be GC-ed.
2460 : // Defaults to 1MB.
2461 0 : func WithMaxRetainedSizeBytes(s int) BatchOption {
2462 0 : return func(opts *batchOptions) {
2463 0 : opts.maxRetainedSizeBytes = s
2464 0 : }
2465 : }
2466 :
2467 : // batchSort returns iterators for the sorted contents of the batch. It is
2468 : // intended for testing use only. The batch.Sort dance is done to prevent
2469 : // exposing this method in the public pebble interface.
2470 : func batchSort(
2471 : i interface{},
2472 : ) (
2473 : points internalIterator,
2474 : rangeDels keyspan.FragmentIterator,
2475 : rangeKeys keyspan.FragmentIterator,
2476 1 : ) {
2477 1 : b := i.(*Batch)
2478 1 : if b.Indexed() {
2479 1 : pointIter := b.newInternalIter(nil)
2480 1 : rangeDelIter := b.newRangeDelIter(nil, math.MaxUint64)
2481 1 : rangeKeyIter := b.newRangeKeyIter(nil, math.MaxUint64)
2482 1 : return pointIter, rangeDelIter, rangeKeyIter
2483 1 : }
2484 1 : f, err := newFlushableBatch(b, b.db.opts.Comparer)
2485 1 : if err != nil {
2486 0 : panic(err)
2487 : }
2488 1 : return f.newIter(nil), f.newRangeDelIter(nil), f.newRangeKeyIter(nil)
2489 : }
2490 :
2491 1 : func init() {
2492 1 : private.BatchSort = batchSort
2493 1 : }
|