Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "fmt"
11 : "io"
12 : "math"
13 : "sort"
14 : "sync"
15 : "sync/atomic"
16 : "time"
17 : "unsafe"
18 :
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/pebble/internal/base"
21 : "github.com/cockroachdb/pebble/internal/batchskl"
22 : "github.com/cockroachdb/pebble/internal/humanize"
23 : "github.com/cockroachdb/pebble/internal/invariants"
24 : "github.com/cockroachdb/pebble/internal/keyspan"
25 : "github.com/cockroachdb/pebble/internal/private"
26 : "github.com/cockroachdb/pebble/internal/rangedel"
27 : "github.com/cockroachdb/pebble/internal/rangekey"
28 : "github.com/cockroachdb/pebble/internal/rawalloc"
29 : )
30 :
31 : const (
32 : batchCountOffset = 8
33 : batchHeaderLen = 12
34 : batchInitialSize = 1 << 10 // 1 KB
35 : batchMaxRetainedSize = 1 << 20 // 1 MB
36 : invalidBatchCount = 1<<32 - 1
37 : maxVarintLen32 = 5
38 : )
39 :
40 : // ErrNotIndexed means that a read operation on a batch failed because the
41 : // batch is not indexed and thus doesn't support reads.
42 : var ErrNotIndexed = errors.New("pebble: batch not indexed")
43 :
44 : // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
45 : var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch"))
46 :
47 : // ErrBatchTooLarge indicates that a batch is invalid or otherwise corrupted.
48 : var ErrBatchTooLarge = base.MarkCorruptionError(errors.Newf("pebble: batch too large: >= %s", humanize.Bytes.Uint64(maxBatchSize)))
49 :
50 : // DeferredBatchOp represents a batch operation (eg. set, merge, delete) that is
51 : // being inserted into the batch. Indexing is not performed on the specified key
52 : // until Finish is called, hence the name deferred. This struct lets the caller
53 : // copy or encode keys/values directly into the batch representation instead of
54 : // copying into an intermediary buffer then having pebble.Batch copy off of it.
55 : type DeferredBatchOp struct {
56 : index *batchskl.Skiplist
57 :
58 : // Key and Value point to parts of the binary batch representation where
59 : // keys and values should be encoded/copied into. len(Key) and len(Value)
60 : // bytes must be copied into these slices respectively before calling
61 : // Finish(). Changing where these slices point to is not allowed.
62 : Key, Value []byte
63 : offset uint32
64 : }
65 :
66 : // Finish completes the addition of this batch operation, and adds it to the
67 : // index if necessary. Must be called once (and exactly once) keys/values
68 : // have been filled into Key and Value. Not calling Finish or not
69 : // copying/encoding keys will result in an incomplete index, and calling Finish
70 : // twice may result in a panic.
71 0 : func (d DeferredBatchOp) Finish() error {
72 0 : if d.index != nil {
73 0 : if err := d.index.Add(d.offset); err != nil {
74 0 : return err
75 0 : }
76 : }
77 0 : return nil
78 : }
79 :
80 : // A Batch is a sequence of Sets, Merges, Deletes, DeleteRanges, RangeKeySets,
81 : // RangeKeyUnsets, and/or RangeKeyDeletes that are applied atomically. Batch
82 : // implements the Reader interface, but only an indexed batch supports reading
83 : // (without error) via Get or NewIter. A non-indexed batch will return
84 : // ErrNotIndexed when read from. A batch is not safe for concurrent use, and
85 : // consumers should use a batch per goroutine or provide their own
86 : // synchronization.
87 : //
88 : // # Indexing
89 : //
90 : // Batches can be optionally indexed (see DB.NewIndexedBatch). An indexed batch
91 : // allows iteration via an Iterator (see Batch.NewIter). The iterator provides
92 : // a merged view of the operations in the batch and the underlying
93 : // database. This is implemented by treating the batch as an additional layer
94 : // in the LSM where every entry in the batch is considered newer than any entry
95 : // in the underlying database (batch entries have the InternalKeySeqNumBatch
96 : // bit set). By treating the batch as an additional layer in the LSM, iteration
97 : // supports all batch operations (i.e. Set, Merge, Delete, DeleteRange,
98 : // RangeKeySet, RangeKeyUnset, RangeKeyDelete) with minimal effort.
99 : //
100 : // The same key can be operated on multiple times in a batch, though only the
101 : // latest operation will be visible. For example, Put("a", "b"), Delete("a")
102 : // will cause the key "a" to not be visible in the batch. Put("a", "b"),
103 : // Put("a", "c") will cause a read of "a" to return the value "c".
104 : //
105 : // The batch index is implemented via an skiplist (internal/batchskl). While
106 : // the skiplist implementation is very fast, inserting into an indexed batch is
107 : // significantly slower than inserting into a non-indexed batch. Only use an
108 : // indexed batch if you require reading from it.
109 : //
110 : // # Atomic commit
111 : //
112 : // The operations in a batch are persisted by calling Batch.Commit which is
113 : // equivalent to calling DB.Apply(batch). A batch is committed atomically by
114 : // writing the internal batch representation to the WAL, adding all of the
115 : // batch operations to the memtable associated with the WAL, and then
116 : // incrementing the visible sequence number so that subsequent reads can see
117 : // the effects of the batch operations. If WriteOptions.Sync is true, a call to
118 : // Batch.Commit will guarantee that the batch is persisted to disk before
119 : // returning. See commitPipeline for more on the implementation details.
120 : //
121 : // # Large batches
122 : //
123 : // The size of a batch is limited only by available memory (be aware that
124 : // indexed batches require considerably additional memory for the skiplist
125 : // structure). A given WAL file has a single memtable associated with it (this
126 : // restriction could be removed, but doing so is onerous and complex). And a
127 : // memtable has a fixed size due to the underlying fixed size arena. Note that
128 : // this differs from RocksDB where a memtable can grow arbitrarily large using
129 : // a list of arena chunks. In RocksDB this is accomplished by storing pointers
130 : // in the arena memory, but that isn't possible in Go.
131 : //
132 : // During Batch.Commit, a batch which is larger than a threshold (>
133 : // MemTableSize/2) is wrapped in a flushableBatch and inserted into the queue
134 : // of memtables. A flushableBatch forces WAL to be rotated, but that happens
135 : // anyways when the memtable becomes full so this does not cause significant
136 : // WAL churn. Because the flushableBatch is readable as another layer in the
137 : // LSM, Batch.Commit returns as soon as the flushableBatch has been added to
138 : // the queue of memtables.
139 : //
140 : // Internally, a flushableBatch provides Iterator support by sorting the batch
141 : // contents (the batch is sorted once, when it is added to the memtable
142 : // queue). Sorting the batch contents and insertion of the contents into a
143 : // memtable have the same big-O time, but the constant factor dominates
144 : // here. Sorting is significantly faster and uses significantly less memory.
145 : //
146 : // # Internal representation
147 : //
148 : // The internal batch representation is a contiguous byte buffer with a fixed
149 : // 12-byte header, followed by a series of records.
150 : //
151 : // +-------------+------------+--- ... ---+
152 : // | SeqNum (8B) | Count (4B) | Entries |
153 : // +-------------+------------+--- ... ---+
154 : //
155 : // Each record has a 1-byte kind tag prefix, followed by 1 or 2 length prefixed
156 : // strings (varstring):
157 : //
158 : // +-----------+-----------------+-------------------+
159 : // | Kind (1B) | Key (varstring) | Value (varstring) |
160 : // +-----------+-----------------+-------------------+
161 : //
162 : // A varstring is a varint32 followed by N bytes of data. The Kind tags are
163 : // exactly those specified by InternalKeyKind. The following table shows the
164 : // format for records of each kind:
165 : //
166 : // InternalKeyKindDelete varstring
167 : // InternalKeyKindLogData varstring
168 : // InternalKeyKindIngestSST varstring
169 : // InternalKeyKindSet varstring varstring
170 : // InternalKeyKindMerge varstring varstring
171 : // InternalKeyKindRangeDelete varstring varstring
172 : // InternalKeyKindRangeKeySet varstring varstring
173 : // InternalKeyKindRangeKeyUnset varstring varstring
174 : // InternalKeyKindRangeKeyDelete varstring varstring
175 : //
176 : // The intuitive understanding here are that the arguments to Delete, Set,
177 : // Merge, DeleteRange and RangeKeyDelete are encoded into the batch. The
178 : // RangeKeySet and RangeKeyUnset operations are slightly more complicated,
179 : // encoding their end key, suffix and value [in the case of RangeKeySet] within
180 : // the Value varstring. For more information on the value encoding for
181 : // RangeKeySet and RangeKeyUnset, see the internal/rangekey package.
182 : //
183 : // The internal batch representation is the on disk format for a batch in the
184 : // WAL, and thus stable. New record kinds may be added, but the existing ones
185 : // will not be modified.
186 : type Batch struct {
187 : batchInternal
188 : applied atomic.Bool
189 : }
190 :
191 : // batchInternal contains the set of fields within Batch that are non-atomic and
192 : // capable of being reset using a *b = batchInternal{} struct copy.
193 : type batchInternal struct {
194 : // Data is the wire format of a batch's log entry:
195 : // - 8 bytes for a sequence number of the first batch element,
196 : // or zeroes if the batch has not yet been applied,
197 : // - 4 bytes for the count: the number of elements in the batch,
198 : // or "\xff\xff\xff\xff" if the batch is invalid,
199 : // - count elements, being:
200 : // - one byte for the kind
201 : // - the varint-string user key,
202 : // - the varint-string value (if kind != delete).
203 : // The sequence number and count are stored in little-endian order.
204 : //
205 : // The data field can be (but is not guaranteed to be) nil for new
206 : // batches. Large batches will set the data field to nil when committed as
207 : // the data has been moved to a flushableBatch and inserted into the queue of
208 : // memtables.
209 : data []byte
210 : cmp Compare
211 : formatKey base.FormatKey
212 : abbreviatedKey AbbreviatedKey
213 :
214 : // An upper bound on required space to add this batch to a memtable.
215 : // Note that although batches are limited to 4 GiB in size, that limit
216 : // applies to len(data), not the memtable size. The upper bound on the
217 : // size of a memtable node is larger than the overhead of the batch's log
218 : // encoding, so memTableSize is larger than len(data) and may overflow a
219 : // uint32.
220 : memTableSize uint64
221 :
222 : // The db to which the batch will be committed. Do not change this field
223 : // after the batch has been created as it might invalidate internal state.
224 : // Batch.memTableSize is only refreshed if Batch.db is set. Setting db to
225 : // nil once it has been set implies that the Batch has encountered an error.
226 : db *DB
227 :
228 : // The count of records in the batch. This count will be stored in the batch
229 : // data whenever Repr() is called.
230 : count uint64
231 :
232 : // The count of range deletions in the batch. Updated every time a range
233 : // deletion is added.
234 : countRangeDels uint64
235 :
236 : // The count of range key sets, unsets and deletes in the batch. Updated
237 : // every time a RANGEKEYSET, RANGEKEYUNSET or RANGEKEYDEL key is added.
238 : countRangeKeys uint64
239 :
240 : // A deferredOp struct, stored in the Batch so that a pointer can be returned
241 : // from the *Deferred() methods rather than a value.
242 : deferredOp DeferredBatchOp
243 :
244 : // An optional skiplist keyed by offset into data of the entry.
245 : index *batchskl.Skiplist
246 : rangeDelIndex *batchskl.Skiplist
247 : rangeKeyIndex *batchskl.Skiplist
248 :
249 : // Fragmented range deletion tombstones. Cached the first time a range
250 : // deletion iterator is requested. The cache is invalidated whenever a new
251 : // range deletion is added to the batch. This cache can only be used when
252 : // opening an iterator to read at a batch sequence number >=
253 : // tombstonesSeqNum. This is the case for all new iterators created over a
254 : // batch but it's not the case for all cloned iterators.
255 : tombstones []keyspan.Span
256 : tombstonesSeqNum uint64
257 :
258 : // Fragmented range key spans. Cached the first time a range key iterator is
259 : // requested. The cache is invalidated whenever a new range key
260 : // (RangeKey{Set,Unset,Del}) is added to the batch. This cache can only be
261 : // used when opening an iterator to read at a batch sequence number >=
262 : // tombstonesSeqNum. This is the case for all new iterators created over a
263 : // batch but it's not the case for all cloned iterators.
264 : rangeKeys []keyspan.Span
265 : rangeKeysSeqNum uint64
266 :
267 : // The flushableBatch wrapper if the batch is too large to fit in the
268 : // memtable.
269 : flushable *flushableBatch
270 :
271 : // minimumFormatMajorVersion indicates the format major version required in
272 : // order to commit this batch. If an operation requires a particular format
273 : // major version, it ratchets the batch's minimumFormatMajorVersion. When
274 : // the batch is committed, this is validated against the database's current
275 : // format major version.
276 : minimumFormatMajorVersion FormatMajorVersion
277 :
278 : // Synchronous Apply uses the commit WaitGroup for both publishing the
279 : // seqnum and waiting for the WAL fsync (if needed). Asynchronous
280 : // ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit
281 : // WaitGroup for publishing the seqnum and the fsyncWait WaitGroup for
282 : // waiting for the WAL fsync.
283 : //
284 : // TODO(sumeer): if we find that ApplyNoSyncWait in conjunction with
285 : // SyncWait is causing higher memory usage because of the time duration
286 : // between when the sync is already done, and a goroutine calls SyncWait
287 : // (followed by Batch.Close), we could separate out {fsyncWait, commitErr}
288 : // into a separate struct that is allocated separately (using another
289 : // sync.Pool), and only that struct needs to outlive Batch.Close (which
290 : // could then be called immediately after ApplyNoSyncWait). commitStats
291 : // will also need to be in this separate struct.
292 : commit sync.WaitGroup
293 : fsyncWait sync.WaitGroup
294 :
295 : commitStats BatchCommitStats
296 :
297 : commitErr error
298 :
299 : // Position bools together to reduce the sizeof the struct.
300 :
301 : // ingestedSSTBatch indicates that the batch contains one or more key kinds
302 : // of InternalKeyKindIngestSST. If the batch contains key kinds of IngestSST
303 : // then it will only contain key kinds of IngestSST.
304 : ingestedSSTBatch bool
305 :
306 : // committing is set to true when a batch begins to commit. It's used to
307 : // ensure the batch is not mutated concurrently. It is not an atomic
308 : // deliberately, so as to avoid the overhead on batch mutations. This is
309 : // okay, because under correct usage this field will never be accessed
310 : // concurrently. It's only under incorrect usage the memory accesses of this
311 : // variable may violate memory safety. Since we don't use atomics here,
312 : // false negatives are possible.
313 : committing bool
314 : }
315 :
316 : // BatchCommitStats exposes stats related to committing a batch.
317 : //
318 : // NB: there is no Pebble internal tracing (using LoggerAndTracer) of slow
319 : // batch commits. The caller can use these stats to do their own tracing as
320 : // needed.
321 : type BatchCommitStats struct {
322 : // TotalDuration is the time spent in DB.{Apply,ApplyNoSyncWait} or
323 : // Batch.Commit, plus the time waiting in Batch.SyncWait. If there is a gap
324 : // between calling ApplyNoSyncWait and calling SyncWait, that gap could
325 : // include some duration in which real work was being done for the commit
326 : // and will not be included here. This missing time is considered acceptable
327 : // since the goal of these stats is to understand user-facing latency.
328 : //
329 : // TotalDuration includes time spent in various queues both inside Pebble
330 : // and outside Pebble (I/O queues, goroutine scheduler queue, mutex wait
331 : // etc.). For some of these queues (which we consider important) the wait
332 : // times are included below -- these expose low-level implementation detail
333 : // and are meant for expert diagnosis and subject to change. There may be
334 : // unaccounted time after subtracting those values from TotalDuration.
335 : TotalDuration time.Duration
336 : // SemaphoreWaitDuration is the wait time for semaphores in
337 : // commitPipeline.Commit.
338 : SemaphoreWaitDuration time.Duration
339 : // WALQueueWaitDuration is the wait time for allocating memory blocks in the
340 : // LogWriter (due to the LogWriter not writing fast enough). At the moment
341 : // this is duration is always zero because a single WAL will allow
342 : // allocating memory blocks up to the entire memtable size. In the future,
343 : // we may pipeline WALs and bound the WAL queued blocks separately, so this
344 : // field is preserved for that possibility.
345 : WALQueueWaitDuration time.Duration
346 : // MemTableWriteStallDuration is the wait caused by a write stall due to too
347 : // many memtables (due to not flushing fast enough).
348 : MemTableWriteStallDuration time.Duration
349 : // L0ReadAmpWriteStallDuration is the wait caused by a write stall due to
350 : // high read amplification in L0 (due to not compacting fast enough out of
351 : // L0).
352 : L0ReadAmpWriteStallDuration time.Duration
353 : // WALRotationDuration is the wait time for WAL rotation, which includes
354 : // syncing and closing the old WAL and creating (or reusing) a new one.
355 : WALRotationDuration time.Duration
356 : // CommitWaitDuration is the wait for publishing the seqnum plus the
357 : // duration for the WAL sync (if requested). The former should be tiny and
358 : // one can assume that this is all due to the WAL sync.
359 : CommitWaitDuration time.Duration
360 : }
361 :
362 : var _ Reader = (*Batch)(nil)
363 : var _ Writer = (*Batch)(nil)
364 :
365 : var batchPool = sync.Pool{
366 1 : New: func() interface{} {
367 1 : return &Batch{}
368 1 : },
369 : }
370 :
371 : type indexedBatch struct {
372 : batch Batch
373 : index batchskl.Skiplist
374 : }
375 :
376 : var indexedBatchPool = sync.Pool{
377 1 : New: func() interface{} {
378 1 : return &indexedBatch{}
379 1 : },
380 : }
381 :
382 1 : func newBatch(db *DB) *Batch {
383 1 : b := batchPool.Get().(*Batch)
384 1 : b.db = db
385 1 : return b
386 1 : }
387 :
388 0 : func newBatchWithSize(db *DB, size int) *Batch {
389 0 : b := newBatch(db)
390 0 : if cap(b.data) < size {
391 0 : b.data = rawalloc.New(0, size)
392 0 : }
393 0 : return b
394 : }
395 :
396 1 : func newIndexedBatch(db *DB, comparer *Comparer) *Batch {
397 1 : i := indexedBatchPool.Get().(*indexedBatch)
398 1 : i.batch.cmp = comparer.Compare
399 1 : i.batch.formatKey = comparer.FormatKey
400 1 : i.batch.abbreviatedKey = comparer.AbbreviatedKey
401 1 : i.batch.db = db
402 1 : i.batch.index = &i.index
403 1 : i.batch.index.Init(&i.batch.data, i.batch.cmp, i.batch.abbreviatedKey)
404 1 : return &i.batch
405 1 : }
406 :
407 0 : func newIndexedBatchWithSize(db *DB, comparer *Comparer, size int) *Batch {
408 0 : b := newIndexedBatch(db, comparer)
409 0 : if cap(b.data) < size {
410 0 : b.data = rawalloc.New(0, size)
411 0 : }
412 0 : return b
413 : }
414 :
415 : // nextSeqNum returns the batch "sequence number" that will be given to the next
416 : // key written to the batch. During iteration keys within an indexed batch are
417 : // given a sequence number consisting of their offset within the batch combined
418 : // with the base.InternalKeySeqNumBatch bit. These sequence numbers are only
419 : // used during iteration, and the keys are assigned ordinary sequence numbers
420 : // when the batch is committed.
421 1 : func (b *Batch) nextSeqNum() uint64 {
422 1 : return uint64(len(b.data)) | base.InternalKeySeqNumBatch
423 1 : }
424 :
425 1 : func (b *Batch) release() {
426 1 : if b.db == nil {
427 1 : // The batch was not created using newBatch or newIndexedBatch, or an error
428 1 : // was encountered. We don't try to reuse batches that encountered an error
429 1 : // because they might be stuck somewhere in the system and attempting to
430 1 : // reuse such batches is a recipe for onerous debugging sessions. Instead,
431 1 : // let the GC do its job.
432 1 : return
433 1 : }
434 1 : b.db = nil
435 1 :
436 1 : // NB: This is ugly (it would be cleaner if we could just assign a Batch{}),
437 1 : // but necessary so that we can use atomic.StoreUint32 for the Batch.applied
438 1 : // field. Without using an atomic to clear that field the Go race detector
439 1 : // complains.
440 1 : b.Reset()
441 1 : b.cmp = nil
442 1 : b.formatKey = nil
443 1 : b.abbreviatedKey = nil
444 1 :
445 1 : if b.index == nil {
446 1 : batchPool.Put(b)
447 1 : } else {
448 1 : b.index, b.rangeDelIndex, b.rangeKeyIndex = nil, nil, nil
449 1 : indexedBatchPool.Put((*indexedBatch)(unsafe.Pointer(b)))
450 1 : }
451 : }
452 :
453 1 : func (b *Batch) refreshMemTableSize() error {
454 1 : b.memTableSize = 0
455 1 : if len(b.data) < batchHeaderLen {
456 0 : return nil
457 0 : }
458 :
459 1 : b.countRangeDels = 0
460 1 : b.countRangeKeys = 0
461 1 : b.minimumFormatMajorVersion = 0
462 1 : for r := b.Reader(); ; {
463 1 : kind, key, value, ok, err := r.Next()
464 1 : if !ok {
465 1 : if err != nil {
466 0 : return err
467 0 : }
468 1 : break
469 : }
470 1 : switch kind {
471 1 : case InternalKeyKindRangeDelete:
472 1 : b.countRangeDels++
473 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
474 1 : b.countRangeKeys++
475 1 : case InternalKeyKindDeleteSized:
476 1 : if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
477 1 : b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
478 1 : }
479 1 : case InternalKeyKindIngestSST:
480 1 : if b.minimumFormatMajorVersion < FormatFlushableIngest {
481 1 : b.minimumFormatMajorVersion = FormatFlushableIngest
482 1 : }
483 : // This key kind doesn't contribute to the memtable size.
484 1 : continue
485 : }
486 1 : b.memTableSize += memTableEntrySize(len(key), len(value))
487 : }
488 1 : return nil
489 : }
490 :
491 : // Apply the operations contained in the batch to the receiver batch.
492 : //
493 : // It is safe to modify the contents of the arguments after Apply returns.
494 1 : func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
495 1 : if b.ingestedSSTBatch {
496 0 : panic("pebble: invalid batch application")
497 : }
498 1 : if len(batch.data) == 0 {
499 1 : return nil
500 1 : }
501 1 : if len(batch.data) < batchHeaderLen {
502 0 : return ErrInvalidBatch
503 0 : }
504 :
505 1 : offset := len(b.data)
506 1 : if offset == 0 {
507 1 : b.init(offset)
508 1 : offset = batchHeaderLen
509 1 : }
510 1 : b.data = append(b.data, batch.data[batchHeaderLen:]...)
511 1 :
512 1 : b.setCount(b.Count() + batch.Count())
513 1 :
514 1 : if b.db != nil || b.index != nil {
515 1 : // Only iterate over the new entries if we need to track memTableSize or in
516 1 : // order to update the index.
517 1 : for iter := BatchReader(b.data[offset:]); len(iter) > 0; {
518 1 : offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
519 1 : kind, key, value, ok, err := iter.Next()
520 1 : if !ok {
521 0 : if err != nil {
522 0 : return err
523 0 : }
524 0 : break
525 : }
526 1 : switch kind {
527 1 : case InternalKeyKindRangeDelete:
528 1 : b.countRangeDels++
529 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
530 1 : b.countRangeKeys++
531 0 : case InternalKeyKindIngestSST:
532 0 : panic("pebble: invalid key kind for batch")
533 : }
534 1 : if b.index != nil {
535 1 : var err error
536 1 : switch kind {
537 0 : case InternalKeyKindRangeDelete:
538 0 : b.tombstones = nil
539 0 : b.tombstonesSeqNum = 0
540 0 : if b.rangeDelIndex == nil {
541 0 : b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
542 0 : }
543 0 : err = b.rangeDelIndex.Add(uint32(offset))
544 0 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
545 0 : b.rangeKeys = nil
546 0 : b.rangeKeysSeqNum = 0
547 0 : if b.rangeKeyIndex == nil {
548 0 : b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
549 0 : }
550 0 : err = b.rangeKeyIndex.Add(uint32(offset))
551 1 : default:
552 1 : err = b.index.Add(uint32(offset))
553 : }
554 1 : if err != nil {
555 0 : return err
556 0 : }
557 : }
558 1 : b.memTableSize += memTableEntrySize(len(key), len(value))
559 : }
560 : }
561 1 : return nil
562 : }
563 :
564 : // Get gets the value for the given key. It returns ErrNotFound if the Batch
565 : // does not contain the key.
566 : //
567 : // The caller should not modify the contents of the returned slice, but it is
568 : // safe to modify the contents of the argument after Get returns. The returned
569 : // slice will remain valid until the returned Closer is closed. On success, the
570 : // caller MUST call closer.Close() or a memory leak will occur.
571 1 : func (b *Batch) Get(key []byte) ([]byte, io.Closer, error) {
572 1 : if b.index == nil {
573 0 : return nil, nil, ErrNotIndexed
574 0 : }
575 1 : return b.db.getInternal(key, b, nil /* snapshot */)
576 : }
577 :
578 1 : func (b *Batch) prepareDeferredKeyValueRecord(keyLen, valueLen int, kind InternalKeyKind) {
579 1 : if b.committing {
580 0 : panic("pebble: batch already committing")
581 : }
582 1 : if len(b.data) == 0 {
583 1 : b.init(keyLen + valueLen + 2*binary.MaxVarintLen64 + batchHeaderLen)
584 1 : }
585 1 : b.count++
586 1 : b.memTableSize += memTableEntrySize(keyLen, valueLen)
587 1 :
588 1 : pos := len(b.data)
589 1 : b.deferredOp.offset = uint32(pos)
590 1 : b.grow(1 + 2*maxVarintLen32 + keyLen + valueLen)
591 1 : b.data[pos] = byte(kind)
592 1 : pos++
593 1 :
594 1 : {
595 1 : // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
596 1 : // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
597 1 : // versions show this to not be a performance win.
598 1 : x := uint32(keyLen)
599 1 : for x >= 0x80 {
600 0 : b.data[pos] = byte(x) | 0x80
601 0 : x >>= 7
602 0 : pos++
603 0 : }
604 1 : b.data[pos] = byte(x)
605 1 : pos++
606 : }
607 :
608 1 : b.deferredOp.Key = b.data[pos : pos+keyLen]
609 1 : pos += keyLen
610 1 :
611 1 : {
612 1 : // TODO(peter): Manually inlined version binary.PutUvarint(). This is 20%
613 1 : // faster on BenchmarkBatchSet on go1.13. Remove if go1.14 or future
614 1 : // versions show this to not be a performance win.
615 1 : x := uint32(valueLen)
616 1 : for x >= 0x80 {
617 0 : b.data[pos] = byte(x) | 0x80
618 0 : x >>= 7
619 0 : pos++
620 0 : }
621 1 : b.data[pos] = byte(x)
622 1 : pos++
623 : }
624 :
625 1 : b.deferredOp.Value = b.data[pos : pos+valueLen]
626 1 : // Shrink data since varints may be shorter than the upper bound.
627 1 : b.data = b.data[:pos+valueLen]
628 : }
629 :
630 1 : func (b *Batch) prepareDeferredKeyRecord(keyLen int, kind InternalKeyKind) {
631 1 : if b.committing {
632 0 : panic("pebble: batch already committing")
633 : }
634 1 : if len(b.data) == 0 {
635 1 : b.init(keyLen + binary.MaxVarintLen64 + batchHeaderLen)
636 1 : }
637 1 : b.count++
638 1 : b.memTableSize += memTableEntrySize(keyLen, 0)
639 1 :
640 1 : pos := len(b.data)
641 1 : b.deferredOp.offset = uint32(pos)
642 1 : b.grow(1 + maxVarintLen32 + keyLen)
643 1 : b.data[pos] = byte(kind)
644 1 : pos++
645 1 :
646 1 : {
647 1 : // TODO(peter): Manually inlined version binary.PutUvarint(). Remove if
648 1 : // go1.13 or future versions show this to not be a performance win. See
649 1 : // BenchmarkBatchSet.
650 1 : x := uint32(keyLen)
651 1 : for x >= 0x80 {
652 0 : b.data[pos] = byte(x) | 0x80
653 0 : x >>= 7
654 0 : pos++
655 0 : }
656 1 : b.data[pos] = byte(x)
657 1 : pos++
658 : }
659 :
660 1 : b.deferredOp.Key = b.data[pos : pos+keyLen]
661 1 : b.deferredOp.Value = nil
662 1 :
663 1 : // Shrink data since varint may be shorter than the upper bound.
664 1 : b.data = b.data[:pos+keyLen]
665 : }
666 :
667 : // AddInternalKey allows the caller to add an internal key of point key or range
668 : // key kinds (but not RangeDelete) to a batch. Passing in an internal key of
669 : // kind RangeDelete will result in a panic. Note that the seqnum in the internal
670 : // key is effectively ignored, even though the Kind is preserved. This is
671 : // because the batch format does not allow for a per-key seqnum to be specified,
672 : // only a batch-wide one.
673 : //
674 : // Note that non-indexed keys (IngestKeyKind{LogData,IngestSST}) are not
675 : // supported with this method as they require specialized logic.
676 1 : func (b *Batch) AddInternalKey(key *base.InternalKey, value []byte, _ *WriteOptions) error {
677 1 : keyLen := len(key.UserKey)
678 1 : hasValue := false
679 1 : switch kind := key.Kind(); kind {
680 0 : case InternalKeyKindRangeDelete:
681 0 : panic("unexpected range delete in AddInternalKey")
682 0 : case InternalKeyKindSingleDelete, InternalKeyKindDelete:
683 0 : b.prepareDeferredKeyRecord(keyLen, kind)
684 0 : b.deferredOp.index = b.index
685 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
686 1 : b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
687 1 : hasValue = true
688 1 : b.incrementRangeKeysCount()
689 0 : default:
690 0 : b.prepareDeferredKeyValueRecord(keyLen, len(value), kind)
691 0 : hasValue = true
692 0 : b.deferredOp.index = b.index
693 : }
694 1 : copy(b.deferredOp.Key, key.UserKey)
695 1 : if hasValue {
696 1 : copy(b.deferredOp.Value, value)
697 1 : }
698 :
699 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
700 : // in go1.13 will remove the need for this.
701 1 : if b.index != nil {
702 0 : if err := b.index.Add(b.deferredOp.offset); err != nil {
703 0 : return err
704 0 : }
705 : }
706 1 : return nil
707 : }
708 :
709 : // Set adds an action to the batch that sets the key to map to the value.
710 : //
711 : // It is safe to modify the contents of the arguments after Set returns.
712 1 : func (b *Batch) Set(key, value []byte, _ *WriteOptions) error {
713 1 : deferredOp := b.SetDeferred(len(key), len(value))
714 1 : copy(deferredOp.Key, key)
715 1 : copy(deferredOp.Value, value)
716 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
717 1 : // in go1.13 will remove the need for this.
718 1 : if b.index != nil {
719 1 : if err := b.index.Add(deferredOp.offset); err != nil {
720 0 : return err
721 0 : }
722 : }
723 1 : return nil
724 : }
725 :
726 : // SetDeferred is similar to Set in that it adds a set operation to the batch,
727 : // except it only takes in key/value lengths instead of complete slices,
728 : // letting the caller encode into those objects and then call Finish() on the
729 : // returned object.
730 1 : func (b *Batch) SetDeferred(keyLen, valueLen int) *DeferredBatchOp {
731 1 : b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindSet)
732 1 : b.deferredOp.index = b.index
733 1 : return &b.deferredOp
734 1 : }
735 :
736 : // Merge adds an action to the batch that merges the value at key with the new
737 : // value. The details of the merge are dependent upon the configured merge
738 : // operator.
739 : //
740 : // It is safe to modify the contents of the arguments after Merge returns.
741 1 : func (b *Batch) Merge(key, value []byte, _ *WriteOptions) error {
742 1 : deferredOp := b.MergeDeferred(len(key), len(value))
743 1 : copy(deferredOp.Key, key)
744 1 : copy(deferredOp.Value, value)
745 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
746 1 : // in go1.13 will remove the need for this.
747 1 : if b.index != nil {
748 1 : if err := b.index.Add(deferredOp.offset); err != nil {
749 0 : return err
750 0 : }
751 : }
752 1 : return nil
753 : }
754 :
755 : // MergeDeferred is similar to Merge in that it adds a merge operation to the
756 : // batch, except it only takes in key/value lengths instead of complete slices,
757 : // letting the caller encode into those objects and then call Finish() on the
758 : // returned object.
759 1 : func (b *Batch) MergeDeferred(keyLen, valueLen int) *DeferredBatchOp {
760 1 : b.prepareDeferredKeyValueRecord(keyLen, valueLen, InternalKeyKindMerge)
761 1 : b.deferredOp.index = b.index
762 1 : return &b.deferredOp
763 1 : }
764 :
765 : // Delete adds an action to the batch that deletes the entry for key.
766 : //
767 : // It is safe to modify the contents of the arguments after Delete returns.
768 1 : func (b *Batch) Delete(key []byte, _ *WriteOptions) error {
769 1 : deferredOp := b.DeleteDeferred(len(key))
770 1 : copy(deferredOp.Key, key)
771 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
772 1 : // in go1.13 will remove the need for this.
773 1 : if b.index != nil {
774 1 : if err := b.index.Add(deferredOp.offset); err != nil {
775 0 : return err
776 0 : }
777 : }
778 1 : return nil
779 : }
780 :
781 : // DeleteDeferred is similar to Delete in that it adds a delete operation to
782 : // the batch, except it only takes in key/value lengths instead of complete
783 : // slices, letting the caller encode into those objects and then call Finish()
784 : // on the returned object.
785 1 : func (b *Batch) DeleteDeferred(keyLen int) *DeferredBatchOp {
786 1 : b.prepareDeferredKeyRecord(keyLen, InternalKeyKindDelete)
787 1 : b.deferredOp.index = b.index
788 1 : return &b.deferredOp
789 1 : }
790 :
791 : // DeleteSized behaves identically to Delete, but takes an additional
792 : // argument indicating the size of the value being deleted. DeleteSized
793 : // should be preferred when the caller has the expectation that there exists
794 : // a single internal KV pair for the key (eg, the key has not been
795 : // overwritten recently), and the caller knows the size of its value.
796 : //
797 : // DeleteSized will record the value size within the tombstone and use it to
798 : // inform compaction-picking heuristics which strive to reduce space
799 : // amplification in the LSM. This "calling your shot" mechanic allows the
800 : // storage engine to more accurately estimate and reduce space amplification.
801 : //
802 : // It is safe to modify the contents of the arguments after DeleteSized
803 : // returns.
804 1 : func (b *Batch) DeleteSized(key []byte, deletedValueSize uint32, _ *WriteOptions) error {
805 1 : deferredOp := b.DeleteSizedDeferred(len(key), deletedValueSize)
806 1 : copy(b.deferredOp.Key, key)
807 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Check if in a
808 1 : // later Go release this is unnecessary.
809 1 : if b.index != nil {
810 1 : if err := b.index.Add(deferredOp.offset); err != nil {
811 0 : return err
812 0 : }
813 : }
814 1 : return nil
815 : }
816 :
817 : // DeleteSizedDeferred is similar to DeleteSized in that it adds a sized delete
818 : // operation to the batch, except it only takes in key length instead of a
819 : // complete key slice, letting the caller encode into the DeferredBatchOp.Key
820 : // slice and then call Finish() on the returned object.
821 1 : func (b *Batch) DeleteSizedDeferred(keyLen int, deletedValueSize uint32) *DeferredBatchOp {
822 1 : if b.minimumFormatMajorVersion < FormatDeleteSizedAndObsolete {
823 1 : b.minimumFormatMajorVersion = FormatDeleteSizedAndObsolete
824 1 : }
825 :
826 : // Encode the sum of the key length and the value in the value.
827 1 : v := uint64(deletedValueSize) + uint64(keyLen)
828 1 :
829 1 : // Encode `v` as a varint.
830 1 : var buf [binary.MaxVarintLen64]byte
831 1 : n := 0
832 1 : {
833 1 : x := v
834 1 : for x >= 0x80 {
835 0 : buf[n] = byte(x) | 0x80
836 0 : x >>= 7
837 0 : n++
838 0 : }
839 1 : buf[n] = byte(x)
840 1 : n++
841 : }
842 :
843 : // NB: In batch entries and sstable entries, values are stored as
844 : // varstrings. Here, the value is itself a simple varint. This results in an
845 : // unnecessary double layer of encoding:
846 : // varint(n) varint(deletedValueSize)
847 : // The first varint will always be 1-byte, since a varint-encoded uint64
848 : // will never exceed 128 bytes. This unnecessary extra byte and wrapping is
849 : // preserved to avoid special casing across the database, and in particular
850 : // in sstable block decoding which is performance sensitive.
851 1 : b.prepareDeferredKeyValueRecord(keyLen, n, InternalKeyKindDeleteSized)
852 1 : b.deferredOp.index = b.index
853 1 : copy(b.deferredOp.Value, buf[:n])
854 1 : return &b.deferredOp
855 : }
856 :
857 : // SingleDelete adds an action to the batch that single deletes the entry for key.
858 : // See Writer.SingleDelete for more details on the semantics of SingleDelete.
859 : //
860 : // It is safe to modify the contents of the arguments after SingleDelete returns.
861 1 : func (b *Batch) SingleDelete(key []byte, _ *WriteOptions) error {
862 1 : deferredOp := b.SingleDeleteDeferred(len(key))
863 1 : copy(deferredOp.Key, key)
864 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
865 1 : // in go1.13 will remove the need for this.
866 1 : if b.index != nil {
867 1 : if err := b.index.Add(deferredOp.offset); err != nil {
868 0 : return err
869 0 : }
870 : }
871 1 : return nil
872 : }
873 :
874 : // SingleDeleteDeferred is similar to SingleDelete in that it adds a single delete
875 : // operation to the batch, except it only takes in key/value lengths instead of
876 : // complete slices, letting the caller encode into those objects and then call
877 : // Finish() on the returned object.
878 1 : func (b *Batch) SingleDeleteDeferred(keyLen int) *DeferredBatchOp {
879 1 : b.prepareDeferredKeyRecord(keyLen, InternalKeyKindSingleDelete)
880 1 : b.deferredOp.index = b.index
881 1 : return &b.deferredOp
882 1 : }
883 :
884 : // DeleteRange deletes all of the point keys (and values) in the range
885 : // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
886 : // delete overlapping range keys (eg, keys set via RangeKeySet).
887 : //
888 : // It is safe to modify the contents of the arguments after DeleteRange
889 : // returns.
890 1 : func (b *Batch) DeleteRange(start, end []byte, _ *WriteOptions) error {
891 1 : deferredOp := b.DeleteRangeDeferred(len(start), len(end))
892 1 : copy(deferredOp.Key, start)
893 1 : copy(deferredOp.Value, end)
894 1 : // TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
895 1 : // in go1.13 will remove the need for this.
896 1 : if deferredOp.index != nil {
897 1 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
898 0 : return err
899 0 : }
900 : }
901 1 : return nil
902 : }
903 :
904 : // DeleteRangeDeferred is similar to DeleteRange in that it adds a delete range
905 : // operation to the batch, except it only takes in key lengths instead of
906 : // complete slices, letting the caller encode into those objects and then call
907 : // Finish() on the returned object. Note that DeferredBatchOp.Key should be
908 : // populated with the start key, and DeferredBatchOp.Value should be populated
909 : // with the end key.
910 1 : func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp {
911 1 : b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeDelete)
912 1 : b.countRangeDels++
913 1 : if b.index != nil {
914 1 : b.tombstones = nil
915 1 : b.tombstonesSeqNum = 0
916 1 : // Range deletions are rare, so we lazily allocate the index for them.
917 1 : if b.rangeDelIndex == nil {
918 1 : b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
919 1 : }
920 1 : b.deferredOp.index = b.rangeDelIndex
921 : }
922 1 : return &b.deferredOp
923 : }
924 :
925 : // RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
926 : // timestamp suffix to value. The suffix is optional. If any portion of the key
927 : // range [start, end) is already set by a range key with the same suffix value,
928 : // RangeKeySet overrides it.
929 : //
930 : // It is safe to modify the contents of the arguments after RangeKeySet returns.
931 1 : func (b *Batch) RangeKeySet(start, end, suffix, value []byte, _ *WriteOptions) error {
932 1 : if invariants.Enabled && b.db != nil && b.db.opts.Comparer.Split != nil {
933 1 : // RangeKeySet is only supported on prefix keys.
934 1 : if b.db.opts.Comparer.Split(start) != len(start) {
935 0 : panic("RangeKeySet called with suffixed start key")
936 : }
937 1 : if b.db.opts.Comparer.Split(end) != len(end) {
938 0 : panic("RangeKeySet called with suffixed end key")
939 : }
940 : }
941 1 : suffixValues := [1]rangekey.SuffixValue{{Suffix: suffix, Value: value}}
942 1 : internalValueLen := rangekey.EncodedSetValueLen(end, suffixValues[:])
943 1 :
944 1 : deferredOp := b.rangeKeySetDeferred(len(start), internalValueLen)
945 1 : copy(deferredOp.Key, start)
946 1 : n := rangekey.EncodeSetValue(deferredOp.Value, end, suffixValues[:])
947 1 : if n != internalValueLen {
948 0 : panic("unexpected internal value length mismatch")
949 : }
950 :
951 : // Manually inline DeferredBatchOp.Finish().
952 1 : if deferredOp.index != nil {
953 1 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
954 0 : return err
955 0 : }
956 : }
957 1 : return nil
958 : }
959 :
960 1 : func (b *Batch) rangeKeySetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
961 1 : b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeySet)
962 1 : b.incrementRangeKeysCount()
963 1 : return &b.deferredOp
964 1 : }
965 :
966 1 : func (b *Batch) incrementRangeKeysCount() {
967 1 : b.countRangeKeys++
968 1 : if b.index != nil {
969 1 : b.rangeKeys = nil
970 1 : b.rangeKeysSeqNum = 0
971 1 : // Range keys are rare, so we lazily allocate the index for them.
972 1 : if b.rangeKeyIndex == nil {
973 1 : b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
974 1 : }
975 1 : b.deferredOp.index = b.rangeKeyIndex
976 : }
977 : }
978 :
979 : // RangeKeyUnset removes a range key mapping the key range [start, end) at the
980 : // MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
981 : // range key. RangeKeyUnset only removes portions of range keys that fall within
982 : // the [start, end) key span, and only range keys with suffixes that exactly
983 : // match the unset suffix.
984 : //
985 : // It is safe to modify the contents of the arguments after RangeKeyUnset
986 : // returns.
987 1 : func (b *Batch) RangeKeyUnset(start, end, suffix []byte, _ *WriteOptions) error {
988 1 : if invariants.Enabled && b.db != nil && b.db.opts.Comparer.Split != nil {
989 1 : // RangeKeyUnset is only supported on prefix keys.
990 1 : if b.db.opts.Comparer.Split(start) != len(start) {
991 0 : panic("RangeKeyUnset called with suffixed start key")
992 : }
993 1 : if b.db.opts.Comparer.Split(end) != len(end) {
994 0 : panic("RangeKeyUnset called with suffixed end key")
995 : }
996 : }
997 1 : suffixes := [1][]byte{suffix}
998 1 : internalValueLen := rangekey.EncodedUnsetValueLen(end, suffixes[:])
999 1 :
1000 1 : deferredOp := b.rangeKeyUnsetDeferred(len(start), internalValueLen)
1001 1 : copy(deferredOp.Key, start)
1002 1 : n := rangekey.EncodeUnsetValue(deferredOp.Value, end, suffixes[:])
1003 1 : if n != internalValueLen {
1004 0 : panic("unexpected internal value length mismatch")
1005 : }
1006 :
1007 : // Manually inline DeferredBatchOp.Finish()
1008 1 : if deferredOp.index != nil {
1009 1 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
1010 0 : return err
1011 0 : }
1012 : }
1013 1 : return nil
1014 : }
1015 :
1016 1 : func (b *Batch) rangeKeyUnsetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
1017 1 : b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeyUnset)
1018 1 : b.incrementRangeKeysCount()
1019 1 : return &b.deferredOp
1020 1 : }
1021 :
1022 : // RangeKeyDelete deletes all of the range keys in the range [start,end)
1023 : // (inclusive on start, exclusive on end). It does not delete point keys (for
1024 : // that use DeleteRange). RangeKeyDelete removes all range keys within the
1025 : // bounds, including those with or without suffixes.
1026 : //
1027 : // It is safe to modify the contents of the arguments after RangeKeyDelete
1028 : // returns.
1029 1 : func (b *Batch) RangeKeyDelete(start, end []byte, _ *WriteOptions) error {
1030 1 : if invariants.Enabled && b.db != nil && b.db.opts.Comparer.Split != nil {
1031 1 : // RangeKeyDelete is only supported on prefix keys.
1032 1 : if b.db.opts.Comparer.Split(start) != len(start) {
1033 0 : panic("RangeKeyDelete called with suffixed start key")
1034 : }
1035 1 : if b.db.opts.Comparer.Split(end) != len(end) {
1036 0 : panic("RangeKeyDelete called with suffixed end key")
1037 : }
1038 : }
1039 1 : deferredOp := b.RangeKeyDeleteDeferred(len(start), len(end))
1040 1 : copy(deferredOp.Key, start)
1041 1 : copy(deferredOp.Value, end)
1042 1 : // Manually inline DeferredBatchOp.Finish().
1043 1 : if deferredOp.index != nil {
1044 1 : if err := deferredOp.index.Add(deferredOp.offset); err != nil {
1045 0 : return err
1046 0 : }
1047 : }
1048 1 : return nil
1049 : }
1050 :
1051 : // RangeKeyDeleteDeferred is similar to RangeKeyDelete in that it adds an
1052 : // operation to delete range keys to the batch, except it only takes in key
1053 : // lengths instead of complete slices, letting the caller encode into those
1054 : // objects and then call Finish() on the returned object. Note that
1055 : // DeferredBatchOp.Key should be populated with the start key, and
1056 : // DeferredBatchOp.Value should be populated with the end key.
1057 1 : func (b *Batch) RangeKeyDeleteDeferred(startLen, endLen int) *DeferredBatchOp {
1058 1 : b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeKeyDelete)
1059 1 : b.incrementRangeKeysCount()
1060 1 : return &b.deferredOp
1061 1 : }
1062 :
1063 : // LogData adds the specified to the batch. The data will be written to the
1064 : // WAL, but not added to memtables or sstables. Log data is never indexed,
1065 : // which makes it useful for testing WAL performance.
1066 : //
1067 : // It is safe to modify the contents of the argument after LogData returns.
1068 0 : func (b *Batch) LogData(data []byte, _ *WriteOptions) error {
1069 0 : origCount, origMemTableSize := b.count, b.memTableSize
1070 0 : b.prepareDeferredKeyRecord(len(data), InternalKeyKindLogData)
1071 0 : copy(b.deferredOp.Key, data)
1072 0 : // Since LogData only writes to the WAL and does not affect the memtable, we
1073 0 : // restore b.count and b.memTableSize to their origin values. Note that
1074 0 : // Batch.count only refers to records that are added to the memtable.
1075 0 : b.count, b.memTableSize = origCount, origMemTableSize
1076 0 : return nil
1077 0 : }
1078 :
1079 : // IngestSST adds the FileNum for an sstable to the batch. The data will only be
1080 : // written to the WAL (not added to memtables or sstables).
1081 1 : func (b *Batch) ingestSST(fileNum base.FileNum) {
1082 1 : if b.Empty() {
1083 1 : b.ingestedSSTBatch = true
1084 1 : } else if !b.ingestedSSTBatch {
1085 0 : // Batch contains other key kinds.
1086 0 : panic("pebble: invalid call to ingestSST")
1087 : }
1088 :
1089 1 : origMemTableSize := b.memTableSize
1090 1 : var buf [binary.MaxVarintLen64]byte
1091 1 : length := binary.PutUvarint(buf[:], uint64(fileNum))
1092 1 : b.prepareDeferredKeyRecord(length, InternalKeyKindIngestSST)
1093 1 : copy(b.deferredOp.Key, buf[:length])
1094 1 : // Since IngestSST writes only to the WAL and does not affect the memtable,
1095 1 : // we restore b.memTableSize to its original value. Note that Batch.count
1096 1 : // is not reset because for the InternalKeyKindIngestSST the count is the
1097 1 : // number of sstable paths which have been added to the batch.
1098 1 : b.memTableSize = origMemTableSize
1099 1 : b.minimumFormatMajorVersion = FormatFlushableIngest
1100 : }
1101 :
1102 : // Empty returns true if the batch is empty, and false otherwise.
1103 1 : func (b *Batch) Empty() bool {
1104 1 : return len(b.data) <= batchHeaderLen
1105 1 : }
1106 :
1107 : // Len returns the current size of the batch in bytes.
1108 0 : func (b *Batch) Len() int {
1109 0 : if len(b.data) <= batchHeaderLen {
1110 0 : return batchHeaderLen
1111 0 : }
1112 0 : return len(b.data)
1113 : }
1114 :
1115 : // Repr returns the underlying batch representation. It is not safe to modify
1116 : // the contents. Reset() will not change the contents of the returned value,
1117 : // though any other mutation operation may do so.
1118 1 : func (b *Batch) Repr() []byte {
1119 1 : if len(b.data) == 0 {
1120 0 : b.init(batchHeaderLen)
1121 0 : }
1122 1 : binary.LittleEndian.PutUint32(b.countData(), b.Count())
1123 1 : return b.data
1124 : }
1125 :
1126 : // SetRepr sets the underlying batch representation. The batch takes ownership
1127 : // of the supplied slice. It is not safe to modify it afterwards until the
1128 : // Batch is no longer in use.
1129 1 : func (b *Batch) SetRepr(data []byte) error {
1130 1 : if len(data) < batchHeaderLen {
1131 0 : return base.CorruptionErrorf("invalid batch")
1132 0 : }
1133 1 : b.data = data
1134 1 : b.count = uint64(binary.LittleEndian.Uint32(b.countData()))
1135 1 : var err error
1136 1 : if b.db != nil {
1137 1 : // Only track memTableSize for batches that will be committed to the DB.
1138 1 : err = b.refreshMemTableSize()
1139 1 : }
1140 1 : return err
1141 : }
1142 :
1143 : // NewIter returns an iterator that is unpositioned (Iterator.Valid() will
1144 : // return false). The iterator can be positioned via a call to SeekGE,
1145 : // SeekPrefixGE, SeekLT, First or Last. Only indexed batches support iterators.
1146 : //
1147 : // The returned Iterator observes all of the Batch's existing mutations, but no
1148 : // later mutations. Its view can be refreshed via RefreshBatchSnapshot or
1149 : // SetOptions().
1150 1 : func (b *Batch) NewIter(o *IterOptions) (*Iterator, error) {
1151 1 : return b.NewIterWithContext(context.Background(), o)
1152 1 : }
1153 :
1154 : // NewIterWithContext is like NewIter, and additionally accepts a context for
1155 : // tracing.
1156 1 : func (b *Batch) NewIterWithContext(ctx context.Context, o *IterOptions) (*Iterator, error) {
1157 1 : if b.index == nil {
1158 0 : return nil, ErrNotIndexed
1159 0 : }
1160 1 : return b.db.newIter(ctx, b, newIterOpts{}, o), nil
1161 : }
1162 :
1163 : // NewBatchOnlyIter constructs an iterator that only reads the contents of the
1164 : // batch, and does not overlay the batch mutations on top of the DB state.
1165 : //
1166 : // The returned Iterator observes all of the Batch's existing mutations, but
1167 : // no later mutations. Its view can be refreshed via RefreshBatchSnapshot or
1168 : // SetOptions().
1169 0 : func (b *Batch) NewBatchOnlyIter(ctx context.Context, o *IterOptions) (*Iterator, error) {
1170 0 : if b.index == nil {
1171 0 : return nil, ErrNotIndexed
1172 0 : }
1173 0 : return b.db.newIter(ctx, b, newIterOpts{batch: batchIterOpts{batchOnly: true}}, o), nil
1174 : }
1175 :
1176 : // newInternalIter creates a new internalIterator that iterates over the
1177 : // contents of the batch.
1178 1 : func (b *Batch) newInternalIter(o *IterOptions) *batchIter {
1179 1 : iter := &batchIter{}
1180 1 : b.initInternalIter(o, iter)
1181 1 : return iter
1182 1 : }
1183 :
1184 1 : func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) {
1185 1 : *iter = batchIter{
1186 1 : cmp: b.cmp,
1187 1 : batch: b,
1188 1 : iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()),
1189 1 : // NB: We explicitly do not propagate the batch snapshot to the point
1190 1 : // key iterator. Filtering point keys within the batch iterator can
1191 1 : // cause pathological behavior where a batch iterator advances
1192 1 : // significantly farther than necessary filtering many batch keys that
1193 1 : // are not visible at the batch sequence number. Instead, the merging
1194 1 : // iterator enforces bounds.
1195 1 : //
1196 1 : // For example, consider an engine that contains the committed keys
1197 1 : // 'bar' and 'bax', with no keys between them. Consider a batch
1198 1 : // containing keys 1,000 keys within the range [a,z]. All of the
1199 1 : // batch keys were added to the batch after the iterator was
1200 1 : // constructed, so they are not visible to the iterator. A call to
1201 1 : // SeekGE('bax') would seek the LSM iterators and discover the key
1202 1 : // 'bax'. It would also seek the batch iterator, landing on the key
1203 1 : // 'baz' but discover it that it's not visible. The batch iterator would
1204 1 : // next through the rest of the batch's keys, only to discover there are
1205 1 : // no visible keys greater than or equal to 'bax'.
1206 1 : //
1207 1 : // Filtering these batch points within the merging iterator ensures that
1208 1 : // the batch iterator never needs to iterate beyond 'baz', because it
1209 1 : // already found a smaller, visible key 'bax'.
1210 1 : snapshot: base.InternalKeySeqNumMax,
1211 1 : }
1212 1 : }
1213 :
1214 1 : func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot uint64) *keyspan.Iter {
1215 1 : // Construct an iterator even if rangeDelIndex is nil, because it is allowed
1216 1 : // to refresh later, so we need the container to exist.
1217 1 : iter := new(keyspan.Iter)
1218 1 : b.initRangeDelIter(o, iter, batchSnapshot)
1219 1 : return iter
1220 1 : }
1221 :
1222 1 : func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot uint64) {
1223 1 : if b.rangeDelIndex == nil {
1224 1 : iter.Init(b.cmp, nil)
1225 1 : return
1226 1 : }
1227 :
1228 : // Fragment the range tombstones the first time a range deletion iterator is
1229 : // requested. The cached tombstones are invalidated if another range
1230 : // deletion tombstone is added to the batch. This cache is only guaranteed
1231 : // to be correct if we're opening an iterator to read at a batch sequence
1232 : // number at least as high as tombstonesSeqNum. The cache is guaranteed to
1233 : // include all tombstones up to tombstonesSeqNum, and if any additional
1234 : // tombstones were added after that sequence number the cache would've been
1235 : // cleared.
1236 1 : nextSeqNum := b.nextSeqNum()
1237 1 : if b.tombstones != nil && b.tombstonesSeqNum <= batchSnapshot {
1238 1 : iter.Init(b.cmp, b.tombstones)
1239 1 : return
1240 1 : }
1241 :
1242 1 : tombstones := make([]keyspan.Span, 0, b.countRangeDels)
1243 1 : frag := &keyspan.Fragmenter{
1244 1 : Cmp: b.cmp,
1245 1 : Format: b.formatKey,
1246 1 : Emit: func(s keyspan.Span) {
1247 1 : tombstones = append(tombstones, s)
1248 1 : },
1249 : }
1250 1 : it := &batchIter{
1251 1 : cmp: b.cmp,
1252 1 : batch: b,
1253 1 : iter: b.rangeDelIndex.NewIter(nil, nil),
1254 1 : snapshot: batchSnapshot,
1255 1 : }
1256 1 : fragmentRangeDels(frag, it, int(b.countRangeDels))
1257 1 : iter.Init(b.cmp, tombstones)
1258 1 :
1259 1 : // If we just read all the tombstones in the batch (eg, batchSnapshot was
1260 1 : // set to b.nextSeqNum()), then cache the tombstones so that a subsequent
1261 1 : // call to initRangeDelIter may use them without refragmenting.
1262 1 : if nextSeqNum == batchSnapshot {
1263 1 : b.tombstones = tombstones
1264 1 : b.tombstonesSeqNum = nextSeqNum
1265 1 : }
1266 : }
1267 :
1268 1 : func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int) {
1269 1 : // The memory management here is a bit subtle. The keys and values returned
1270 1 : // by the iterator are slices in Batch.data. Thus the fragmented tombstones
1271 1 : // are slices within Batch.data. If additional entries are added to the
1272 1 : // Batch, Batch.data may be reallocated. The references in the fragmented
1273 1 : // tombstones will remain valid, pointing into the old Batch.data. GC for
1274 1 : // the win.
1275 1 :
1276 1 : // Use a single []keyspan.Key buffer to avoid allocating many
1277 1 : // individual []keyspan.Key slices with a single element each.
1278 1 : keyBuf := make([]keyspan.Key, 0, count)
1279 1 : for key, val := it.First(); key != nil; key, val = it.Next() {
1280 1 : s := rangedel.Decode(*key, val.InPlaceValue(), keyBuf)
1281 1 : keyBuf = s.Keys[len(s.Keys):]
1282 1 :
1283 1 : // Set a fixed capacity to avoid accidental overwriting.
1284 1 : s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
1285 1 : frag.Add(s)
1286 1 : }
1287 1 : frag.Finish()
1288 : }
1289 :
1290 1 : func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot uint64) *keyspan.Iter {
1291 1 : // Construct an iterator even if rangeKeyIndex is nil, because it is allowed
1292 1 : // to refresh later, so we need the container to exist.
1293 1 : iter := new(keyspan.Iter)
1294 1 : b.initRangeKeyIter(o, iter, batchSnapshot)
1295 1 : return iter
1296 1 : }
1297 :
1298 1 : func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot uint64) {
1299 1 : if b.rangeKeyIndex == nil {
1300 1 : iter.Init(b.cmp, nil)
1301 1 : return
1302 1 : }
1303 :
1304 : // Fragment the range keys the first time a range key iterator is requested.
1305 : // The cached spans are invalidated if another range key is added to the
1306 : // batch. This cache is only guaranteed to be correct if we're opening an
1307 : // iterator to read at a batch sequence number at least as high as
1308 : // rangeKeysSeqNum. The cache is guaranteed to include all range keys up to
1309 : // rangeKeysSeqNum, and if any additional range keys were added after that
1310 : // sequence number the cache would've been cleared.
1311 1 : nextSeqNum := b.nextSeqNum()
1312 1 : if b.rangeKeys != nil && b.rangeKeysSeqNum <= batchSnapshot {
1313 1 : iter.Init(b.cmp, b.rangeKeys)
1314 1 : return
1315 1 : }
1316 :
1317 1 : rangeKeys := make([]keyspan.Span, 0, b.countRangeKeys)
1318 1 : frag := &keyspan.Fragmenter{
1319 1 : Cmp: b.cmp,
1320 1 : Format: b.formatKey,
1321 1 : Emit: func(s keyspan.Span) {
1322 1 : rangeKeys = append(rangeKeys, s)
1323 1 : },
1324 : }
1325 1 : it := &batchIter{
1326 1 : cmp: b.cmp,
1327 1 : batch: b,
1328 1 : iter: b.rangeKeyIndex.NewIter(nil, nil),
1329 1 : snapshot: batchSnapshot,
1330 1 : }
1331 1 : fragmentRangeKeys(frag, it, int(b.countRangeKeys))
1332 1 : iter.Init(b.cmp, rangeKeys)
1333 1 :
1334 1 : // If we just read all the range keys in the batch (eg, batchSnapshot was
1335 1 : // set to b.nextSeqNum()), then cache the range keys so that a subsequent
1336 1 : // call to initRangeKeyIter may use them without refragmenting.
1337 1 : if nextSeqNum == batchSnapshot {
1338 1 : b.rangeKeys = rangeKeys
1339 1 : b.rangeKeysSeqNum = nextSeqNum
1340 1 : }
1341 : }
1342 :
1343 1 : func fragmentRangeKeys(frag *keyspan.Fragmenter, it internalIterator, count int) error {
1344 1 : // The memory management here is a bit subtle. The keys and values
1345 1 : // returned by the iterator are slices in Batch.data. Thus the
1346 1 : // fragmented key spans are slices within Batch.data. If additional
1347 1 : // entries are added to the Batch, Batch.data may be reallocated. The
1348 1 : // references in the fragmented keys will remain valid, pointing into
1349 1 : // the old Batch.data. GC for the win.
1350 1 :
1351 1 : // Use a single []keyspan.Key buffer to avoid allocating many
1352 1 : // individual []keyspan.Key slices with a single element each.
1353 1 : keyBuf := make([]keyspan.Key, 0, count)
1354 1 : for ik, val := it.First(); ik != nil; ik, val = it.Next() {
1355 1 : s, err := rangekey.Decode(*ik, val.InPlaceValue(), keyBuf)
1356 1 : if err != nil {
1357 0 : return err
1358 0 : }
1359 1 : keyBuf = s.Keys[len(s.Keys):]
1360 1 :
1361 1 : // Set a fixed capacity to avoid accidental overwriting.
1362 1 : s.Keys = s.Keys[:len(s.Keys):len(s.Keys)]
1363 1 : frag.Add(s)
1364 : }
1365 1 : frag.Finish()
1366 1 : return nil
1367 : }
1368 :
1369 : // Commit applies the batch to its parent writer.
1370 1 : func (b *Batch) Commit(o *WriteOptions) error {
1371 1 : return b.db.Apply(b, o)
1372 1 : }
1373 :
1374 : // Close closes the batch without committing it.
1375 1 : func (b *Batch) Close() error {
1376 1 : b.release()
1377 1 : return nil
1378 1 : }
1379 :
1380 : // Indexed returns true if the batch is indexed (i.e. supports read
1381 : // operations).
1382 1 : func (b *Batch) Indexed() bool {
1383 1 : return b.index != nil
1384 1 : }
1385 :
1386 : // init ensures that the batch data slice is initialized to meet the
1387 : // minimum required size and allocates space for the batch header.
1388 1 : func (b *Batch) init(size int) {
1389 1 : n := batchInitialSize
1390 1 : for n < size {
1391 0 : n *= 2
1392 0 : }
1393 1 : if cap(b.data) < n {
1394 1 : b.data = rawalloc.New(batchHeaderLen, n)
1395 1 : }
1396 1 : b.data = b.data[:batchHeaderLen]
1397 1 : clear(b.data) // Zero the sequence number in the header
1398 : }
1399 :
1400 : // Reset resets the batch for reuse. The underlying byte slice (that is
1401 : // returned by Repr()) may not be modified. It is only necessary to call this
1402 : // method if a batch is explicitly being reused. Close automatically takes are
1403 : // of releasing resources when appropriate for batches that are internally
1404 : // being reused.
1405 1 : func (b *Batch) Reset() {
1406 1 : // Zero out the struct, retaining only the fields necessary for manual
1407 1 : // reuse.
1408 1 : b.batchInternal = batchInternal{
1409 1 : data: b.data,
1410 1 : cmp: b.cmp,
1411 1 : formatKey: b.formatKey,
1412 1 : abbreviatedKey: b.abbreviatedKey,
1413 1 : index: b.index,
1414 1 : db: b.db,
1415 1 : }
1416 1 : b.applied.Store(false)
1417 1 : if b.data != nil {
1418 1 : if cap(b.data) > batchMaxRetainedSize {
1419 0 : // If the capacity of the buffer is larger than our maximum
1420 0 : // retention size, don't re-use it. Let it be GC-ed instead.
1421 0 : // This prevents the memory from an unusually large batch from
1422 0 : // being held on to indefinitely.
1423 0 : b.data = nil
1424 1 : } else {
1425 1 : // Otherwise, reset the buffer for re-use.
1426 1 : b.data = b.data[:batchHeaderLen]
1427 1 : clear(b.data)
1428 1 : }
1429 : }
1430 1 : if b.index != nil {
1431 1 : b.index.Init(&b.data, b.cmp, b.abbreviatedKey)
1432 1 : }
1433 : }
1434 :
1435 : // seqNumData returns the 8 byte little-endian sequence number. Zero means that
1436 : // the batch has not yet been applied.
1437 1 : func (b *Batch) seqNumData() []byte {
1438 1 : return b.data[:8]
1439 1 : }
1440 :
1441 : // countData returns the 4 byte little-endian count data. "\xff\xff\xff\xff"
1442 : // means that the batch is invalid.
1443 1 : func (b *Batch) countData() []byte {
1444 1 : return b.data[8:12]
1445 1 : }
1446 :
1447 1 : func (b *Batch) grow(n int) {
1448 1 : newSize := len(b.data) + n
1449 1 : if uint64(newSize) >= maxBatchSize {
1450 0 : panic(ErrBatchTooLarge)
1451 : }
1452 1 : if newSize > cap(b.data) {
1453 0 : newCap := 2 * cap(b.data)
1454 0 : for newCap < newSize {
1455 0 : newCap *= 2
1456 0 : }
1457 0 : newData := rawalloc.New(len(b.data), newCap)
1458 0 : copy(newData, b.data)
1459 0 : b.data = newData
1460 : }
1461 1 : b.data = b.data[:newSize]
1462 : }
1463 :
1464 1 : func (b *Batch) setSeqNum(seqNum uint64) {
1465 1 : binary.LittleEndian.PutUint64(b.seqNumData(), seqNum)
1466 1 : }
1467 :
1468 : // SeqNum returns the batch sequence number which is applied to the first
1469 : // record in the batch. The sequence number is incremented for each subsequent
1470 : // record. It returns zero if the batch is empty.
1471 1 : func (b *Batch) SeqNum() uint64 {
1472 1 : if len(b.data) == 0 {
1473 0 : b.init(batchHeaderLen)
1474 0 : }
1475 1 : return binary.LittleEndian.Uint64(b.seqNumData())
1476 : }
1477 :
1478 1 : func (b *Batch) setCount(v uint32) {
1479 1 : b.count = uint64(v)
1480 1 : }
1481 :
1482 : // Count returns the count of memtable-modifying operations in this batch. All
1483 : // operations with the except of LogData increment this count. For IngestSSTs,
1484 : // count is only used to indicate the number of SSTs ingested in the record, the
1485 : // batch isn't applied to the memtable.
1486 1 : func (b *Batch) Count() uint32 {
1487 1 : if b.count > math.MaxUint32 {
1488 0 : panic(ErrInvalidBatch)
1489 : }
1490 1 : return uint32(b.count)
1491 : }
1492 :
1493 : // Reader returns a BatchReader for the current batch contents. If the batch is
1494 : // mutated, the new entries will not be visible to the reader.
1495 1 : func (b *Batch) Reader() BatchReader {
1496 1 : if len(b.data) == 0 {
1497 1 : b.init(batchHeaderLen)
1498 1 : }
1499 1 : return b.data[batchHeaderLen:]
1500 : }
1501 :
1502 1 : func batchDecodeStr(data []byte) (odata []byte, s []byte, ok bool) {
1503 1 : // TODO(jackson): This will index out of bounds if there's no varint or an
1504 1 : // invalid varint (eg, a single 0xff byte). Correcting will add a bit of
1505 1 : // overhead. We could avoid that overhead whenever len(data) >=
1506 1 : // binary.MaxVarint32?
1507 1 :
1508 1 : var v uint32
1509 1 : var n int
1510 1 : ptr := unsafe.Pointer(&data[0])
1511 1 : if a := *((*uint8)(ptr)); a < 128 {
1512 1 : v = uint32(a)
1513 1 : n = 1
1514 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
1515 0 : v = uint32(b)<<7 | uint32(a)
1516 0 : n = 2
1517 0 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
1518 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
1519 0 : n = 3
1520 0 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
1521 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
1522 0 : n = 4
1523 0 : } else {
1524 0 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
1525 0 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
1526 0 : n = 5
1527 0 : }
1528 :
1529 1 : data = data[n:]
1530 1 : if v > uint32(len(data)) {
1531 0 : return nil, nil, false
1532 0 : }
1533 1 : return data[v:], data[:v], true
1534 : }
1535 :
1536 : // SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
1537 1 : func (b *Batch) SyncWait() error {
1538 1 : now := time.Now()
1539 1 : b.fsyncWait.Wait()
1540 1 : if b.commitErr != nil {
1541 0 : b.db = nil // prevent batch reuse on error
1542 0 : }
1543 1 : waitDuration := time.Since(now)
1544 1 : b.commitStats.CommitWaitDuration += waitDuration
1545 1 : b.commitStats.TotalDuration += waitDuration
1546 1 : return b.commitErr
1547 : }
1548 :
1549 : // CommitStats returns stats related to committing the batch. Should be called
1550 : // after Batch.Commit, DB.Apply. If DB.ApplyNoSyncWait is used, should be
1551 : // called after Batch.SyncWait.
1552 0 : func (b *Batch) CommitStats() BatchCommitStats {
1553 0 : return b.commitStats
1554 0 : }
1555 :
1556 : // BatchReader iterates over the entries contained in a batch.
1557 : type BatchReader []byte
1558 :
1559 : // ReadBatch constructs a BatchReader from a batch representation. The
1560 : // header is not validated. ReadBatch returns a new batch reader and the
1561 : // count of entries contained within the batch.
1562 0 : func ReadBatch(repr []byte) (r BatchReader, count uint32) {
1563 0 : if len(repr) <= batchHeaderLen {
1564 0 : return nil, count
1565 0 : }
1566 0 : count = binary.LittleEndian.Uint32(repr[batchCountOffset:batchHeaderLen])
1567 0 : return repr[batchHeaderLen:], count
1568 : }
1569 :
1570 : // Next returns the next entry in this batch, if there is one. If the reader has
1571 : // reached the end of the batch, Next returns ok=false and a nil error. If the
1572 : // batch is corrupt and the next entry is illegible, Next returns ok=false and a
1573 : // non-nil error.
1574 1 : func (r *BatchReader) Next() (kind InternalKeyKind, ukey []byte, value []byte, ok bool, err error) {
1575 1 : if len(*r) == 0 {
1576 1 : return 0, nil, nil, false, nil
1577 1 : }
1578 1 : kind = InternalKeyKind((*r)[0])
1579 1 : if kind > InternalKeyKindMax {
1580 0 : return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0])
1581 0 : }
1582 1 : *r, ukey, ok = batchDecodeStr((*r)[1:])
1583 1 : if !ok {
1584 0 : return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key")
1585 0 : }
1586 1 : switch kind {
1587 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
1588 : InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
1589 1 : InternalKeyKindDeleteSized:
1590 1 : *r, value, ok = batchDecodeStr(*r)
1591 1 : if !ok {
1592 0 : return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind)
1593 0 : }
1594 : }
1595 1 : return kind, ukey, value, true, nil
1596 : }
1597 :
1598 : // Note: batchIter mirrors the implementation of flushableBatchIter. Keep the
1599 : // two in sync.
1600 : type batchIter struct {
1601 : cmp Compare
1602 : batch *Batch
1603 : iter batchskl.Iterator
1604 : err error
1605 : // snapshot holds a batch "sequence number" at which the batch is being
1606 : // read. This sequence number has the InternalKeySeqNumBatch bit set, so it
1607 : // encodes an offset within the batch. Only batch entries earlier than the
1608 : // offset are visible during iteration.
1609 : snapshot uint64
1610 : }
1611 :
1612 : // batchIter implements the base.InternalIterator interface.
1613 : var _ base.InternalIterator = (*batchIter)(nil)
1614 :
1615 0 : func (i *batchIter) String() string {
1616 0 : return "batch"
1617 0 : }
1618 :
1619 1 : func (i *batchIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, base.LazyValue) {
1620 1 : // Ignore TrySeekUsingNext if the view of the batch changed.
1621 1 : if flags.TrySeekUsingNext() && flags.BatchJustRefreshed() {
1622 0 : flags = flags.DisableTrySeekUsingNext()
1623 0 : }
1624 :
1625 1 : i.err = nil // clear cached iteration error
1626 1 : ikey := i.iter.SeekGE(key, flags)
1627 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1628 0 : ikey = i.iter.Next()
1629 0 : }
1630 1 : if ikey == nil {
1631 1 : return nil, base.LazyValue{}
1632 1 : }
1633 1 : return ikey, base.MakeInPlaceValue(i.value())
1634 : }
1635 :
1636 : func (i *batchIter) SeekPrefixGE(
1637 : prefix, key []byte, flags base.SeekGEFlags,
1638 1 : ) (*base.InternalKey, base.LazyValue) {
1639 1 : i.err = nil // clear cached iteration error
1640 1 : return i.SeekGE(key, flags)
1641 1 : }
1642 :
1643 1 : func (i *batchIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, base.LazyValue) {
1644 1 : i.err = nil // clear cached iteration error
1645 1 : ikey := i.iter.SeekLT(key)
1646 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1647 0 : ikey = i.iter.Prev()
1648 0 : }
1649 1 : if ikey == nil {
1650 1 : return nil, base.LazyValue{}
1651 1 : }
1652 1 : return ikey, base.MakeInPlaceValue(i.value())
1653 : }
1654 :
1655 1 : func (i *batchIter) First() (*InternalKey, base.LazyValue) {
1656 1 : i.err = nil // clear cached iteration error
1657 1 : ikey := i.iter.First()
1658 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1659 0 : ikey = i.iter.Next()
1660 0 : }
1661 1 : if ikey == nil {
1662 1 : return nil, base.LazyValue{}
1663 1 : }
1664 1 : return ikey, base.MakeInPlaceValue(i.value())
1665 : }
1666 :
1667 1 : func (i *batchIter) Last() (*InternalKey, base.LazyValue) {
1668 1 : i.err = nil // clear cached iteration error
1669 1 : ikey := i.iter.Last()
1670 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1671 0 : ikey = i.iter.Prev()
1672 0 : }
1673 1 : if ikey == nil {
1674 1 : return nil, base.LazyValue{}
1675 1 : }
1676 1 : return ikey, base.MakeInPlaceValue(i.value())
1677 : }
1678 :
1679 1 : func (i *batchIter) Next() (*InternalKey, base.LazyValue) {
1680 1 : ikey := i.iter.Next()
1681 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1682 0 : ikey = i.iter.Next()
1683 0 : }
1684 1 : if ikey == nil {
1685 1 : return nil, base.LazyValue{}
1686 1 : }
1687 1 : return ikey, base.MakeInPlaceValue(i.value())
1688 : }
1689 :
1690 0 : func (i *batchIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
1691 0 : // Because NextPrefix was invoked `succKey` must be ≥ the key at i's current
1692 0 : // position. Seek the arena iterator using TrySeekUsingNext.
1693 0 : ikey := i.iter.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
1694 0 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1695 0 : ikey = i.iter.Next()
1696 0 : }
1697 0 : if ikey == nil {
1698 0 : return nil, base.LazyValue{}
1699 0 : }
1700 0 : return ikey, base.MakeInPlaceValue(i.value())
1701 : }
1702 :
1703 1 : func (i *batchIter) Prev() (*InternalKey, base.LazyValue) {
1704 1 : ikey := i.iter.Prev()
1705 1 : for ikey != nil && ikey.SeqNum() >= i.snapshot {
1706 0 : ikey = i.iter.Prev()
1707 0 : }
1708 1 : if ikey == nil {
1709 1 : return nil, base.LazyValue{}
1710 1 : }
1711 1 : return ikey, base.MakeInPlaceValue(i.value())
1712 : }
1713 :
1714 1 : func (i *batchIter) value() []byte {
1715 1 : offset, _, keyEnd := i.iter.KeyInfo()
1716 1 : data := i.batch.data
1717 1 : if len(data[offset:]) == 0 {
1718 0 : i.err = base.CorruptionErrorf("corrupted batch")
1719 0 : return nil
1720 0 : }
1721 :
1722 1 : switch InternalKeyKind(data[offset]) {
1723 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
1724 : InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
1725 1 : InternalKeyKindDeleteSized:
1726 1 : _, value, ok := batchDecodeStr(data[keyEnd:])
1727 1 : if !ok {
1728 0 : return nil
1729 0 : }
1730 1 : return value
1731 1 : default:
1732 1 : return nil
1733 : }
1734 : }
1735 :
1736 1 : func (i *batchIter) Error() error {
1737 1 : return i.err
1738 1 : }
1739 :
1740 1 : func (i *batchIter) Close() error {
1741 1 : _ = i.iter.Close()
1742 1 : return i.err
1743 1 : }
1744 :
1745 1 : func (i *batchIter) SetBounds(lower, upper []byte) {
1746 1 : i.iter.SetBounds(lower, upper)
1747 1 : }
1748 :
1749 0 : func (i *batchIter) SetContext(_ context.Context) {}
1750 :
1751 : type flushableBatchEntry struct {
1752 : // offset is the byte offset of the record within the batch repr.
1753 : offset uint32
1754 : // index is the 0-based ordinal number of the record within the batch. Used
1755 : // to compute the seqnum for the record.
1756 : index uint32
1757 : // key{Start,End} are the start and end byte offsets of the key within the
1758 : // batch repr. Cached to avoid decoding the key length on every
1759 : // comparison. The value is stored starting at keyEnd.
1760 : keyStart uint32
1761 : keyEnd uint32
1762 : }
1763 :
1764 : // flushableBatch wraps an existing batch and provides the interfaces needed
1765 : // for making the batch flushable (i.e. able to mimic a memtable).
1766 : type flushableBatch struct {
1767 : cmp Compare
1768 : formatKey base.FormatKey
1769 : data []byte
1770 :
1771 : // The base sequence number for the entries in the batch. This is the same
1772 : // value as Batch.seqNum() and is cached here for performance.
1773 : seqNum uint64
1774 :
1775 : // A slice of offsets and indices for the entries in the batch. Used to
1776 : // implement flushableBatchIter. Unlike the indexing on a normal batch, a
1777 : // flushable batch is indexed such that batch entry i will be given the
1778 : // sequence number flushableBatch.seqNum+i.
1779 : //
1780 : // Sorted in increasing order of key and decreasing order of offset (since
1781 : // higher offsets correspond to higher sequence numbers).
1782 : //
1783 : // Does not include range deletion entries or range key entries.
1784 : offsets []flushableBatchEntry
1785 :
1786 : // Fragmented range deletion tombstones.
1787 : tombstones []keyspan.Span
1788 :
1789 : // Fragmented range keys.
1790 : rangeKeys []keyspan.Span
1791 : }
1792 :
1793 : var _ flushable = (*flushableBatch)(nil)
1794 :
1795 : // newFlushableBatch creates a new batch that implements the flushable
1796 : // interface. This allows the batch to act like a memtable and be placed in the
1797 : // queue of flushable memtables. Note that the flushable batch takes ownership
1798 : // of the batch data.
1799 1 : func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error) {
1800 1 : b := &flushableBatch{
1801 1 : data: batch.data,
1802 1 : cmp: comparer.Compare,
1803 1 : formatKey: comparer.FormatKey,
1804 1 : offsets: make([]flushableBatchEntry, 0, batch.Count()),
1805 1 : }
1806 1 : if b.data != nil {
1807 1 : // Note that this sequence number is not correct when this batch has not
1808 1 : // been applied since the sequence number has not been assigned yet. The
1809 1 : // correct sequence number will be set later. But it is correct when the
1810 1 : // batch is being replayed from the WAL.
1811 1 : b.seqNum = batch.SeqNum()
1812 1 : }
1813 1 : var rangeDelOffsets []flushableBatchEntry
1814 1 : var rangeKeyOffsets []flushableBatchEntry
1815 1 : if len(b.data) > batchHeaderLen {
1816 1 : // Non-empty batch.
1817 1 : var index uint32
1818 1 : for iter := BatchReader(b.data[batchHeaderLen:]); len(iter) > 0; index++ {
1819 1 : offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
1820 1 : kind, key, _, ok, err := iter.Next()
1821 1 : if !ok {
1822 0 : if err != nil {
1823 0 : return nil, err
1824 0 : }
1825 0 : break
1826 : }
1827 1 : entry := flushableBatchEntry{
1828 1 : offset: uint32(offset),
1829 1 : index: uint32(index),
1830 1 : }
1831 1 : if keySize := uint32(len(key)); keySize == 0 {
1832 0 : // Must add 2 to the offset. One byte encodes `kind` and the next
1833 0 : // byte encodes `0`, which is the length of the key.
1834 0 : entry.keyStart = uint32(offset) + 2
1835 0 : entry.keyEnd = entry.keyStart
1836 1 : } else {
1837 1 : entry.keyStart = uint32(uintptr(unsafe.Pointer(&key[0])) -
1838 1 : uintptr(unsafe.Pointer(&b.data[0])))
1839 1 : entry.keyEnd = entry.keyStart + keySize
1840 1 : }
1841 1 : switch kind {
1842 1 : case InternalKeyKindRangeDelete:
1843 1 : rangeDelOffsets = append(rangeDelOffsets, entry)
1844 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
1845 1 : rangeKeyOffsets = append(rangeKeyOffsets, entry)
1846 1 : default:
1847 1 : b.offsets = append(b.offsets, entry)
1848 : }
1849 : }
1850 : }
1851 :
1852 : // Sort all of offsets, rangeDelOffsets and rangeKeyOffsets, using *batch's
1853 : // sort.Interface implementation.
1854 1 : pointOffsets := b.offsets
1855 1 : sort.Sort(b)
1856 1 : b.offsets = rangeDelOffsets
1857 1 : sort.Sort(b)
1858 1 : b.offsets = rangeKeyOffsets
1859 1 : sort.Sort(b)
1860 1 : b.offsets = pointOffsets
1861 1 :
1862 1 : if len(rangeDelOffsets) > 0 {
1863 1 : frag := &keyspan.Fragmenter{
1864 1 : Cmp: b.cmp,
1865 1 : Format: b.formatKey,
1866 1 : Emit: func(s keyspan.Span) {
1867 1 : b.tombstones = append(b.tombstones, s)
1868 1 : },
1869 : }
1870 1 : it := &flushableBatchIter{
1871 1 : batch: b,
1872 1 : data: b.data,
1873 1 : offsets: rangeDelOffsets,
1874 1 : cmp: b.cmp,
1875 1 : index: -1,
1876 1 : }
1877 1 : fragmentRangeDels(frag, it, len(rangeDelOffsets))
1878 : }
1879 1 : if len(rangeKeyOffsets) > 0 {
1880 1 : frag := &keyspan.Fragmenter{
1881 1 : Cmp: b.cmp,
1882 1 : Format: b.formatKey,
1883 1 : Emit: func(s keyspan.Span) {
1884 1 : b.rangeKeys = append(b.rangeKeys, s)
1885 1 : },
1886 : }
1887 1 : it := &flushableBatchIter{
1888 1 : batch: b,
1889 1 : data: b.data,
1890 1 : offsets: rangeKeyOffsets,
1891 1 : cmp: b.cmp,
1892 1 : index: -1,
1893 1 : }
1894 1 : fragmentRangeKeys(frag, it, len(rangeKeyOffsets))
1895 : }
1896 1 : return b, nil
1897 : }
1898 :
1899 1 : func (b *flushableBatch) setSeqNum(seqNum uint64) {
1900 1 : if b.seqNum != 0 {
1901 0 : panic(fmt.Sprintf("pebble: flushableBatch.seqNum already set: %d", b.seqNum))
1902 : }
1903 1 : b.seqNum = seqNum
1904 1 : for i := range b.tombstones {
1905 1 : for j := range b.tombstones[i].Keys {
1906 1 : b.tombstones[i].Keys[j].Trailer = base.MakeTrailer(
1907 1 : b.tombstones[i].Keys[j].SeqNum()+seqNum,
1908 1 : b.tombstones[i].Keys[j].Kind(),
1909 1 : )
1910 1 : }
1911 : }
1912 1 : for i := range b.rangeKeys {
1913 1 : for j := range b.rangeKeys[i].Keys {
1914 1 : b.rangeKeys[i].Keys[j].Trailer = base.MakeTrailer(
1915 1 : b.rangeKeys[i].Keys[j].SeqNum()+seqNum,
1916 1 : b.rangeKeys[i].Keys[j].Kind(),
1917 1 : )
1918 1 : }
1919 : }
1920 : }
1921 :
1922 1 : func (b *flushableBatch) Len() int {
1923 1 : return len(b.offsets)
1924 1 : }
1925 :
1926 1 : func (b *flushableBatch) Less(i, j int) bool {
1927 1 : ei := &b.offsets[i]
1928 1 : ej := &b.offsets[j]
1929 1 : ki := b.data[ei.keyStart:ei.keyEnd]
1930 1 : kj := b.data[ej.keyStart:ej.keyEnd]
1931 1 : switch c := b.cmp(ki, kj); {
1932 1 : case c < 0:
1933 1 : return true
1934 1 : case c > 0:
1935 1 : return false
1936 1 : default:
1937 1 : return ei.offset > ej.offset
1938 : }
1939 : }
1940 :
1941 1 : func (b *flushableBatch) Swap(i, j int) {
1942 1 : b.offsets[i], b.offsets[j] = b.offsets[j], b.offsets[i]
1943 1 : }
1944 :
1945 : // newIter is part of the flushable interface.
1946 1 : func (b *flushableBatch) newIter(o *IterOptions) internalIterator {
1947 1 : return &flushableBatchIter{
1948 1 : batch: b,
1949 1 : data: b.data,
1950 1 : offsets: b.offsets,
1951 1 : cmp: b.cmp,
1952 1 : index: -1,
1953 1 : lower: o.GetLowerBound(),
1954 1 : upper: o.GetUpperBound(),
1955 1 : }
1956 1 : }
1957 :
1958 : // newFlushIter is part of the flushable interface.
1959 1 : func (b *flushableBatch) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
1960 1 : return &flushFlushableBatchIter{
1961 1 : flushableBatchIter: flushableBatchIter{
1962 1 : batch: b,
1963 1 : data: b.data,
1964 1 : offsets: b.offsets,
1965 1 : cmp: b.cmp,
1966 1 : index: -1,
1967 1 : },
1968 1 : bytesIterated: bytesFlushed,
1969 1 : }
1970 1 : }
1971 :
1972 : // newRangeDelIter is part of the flushable interface.
1973 1 : func (b *flushableBatch) newRangeDelIter(o *IterOptions) keyspan.FragmentIterator {
1974 1 : if len(b.tombstones) == 0 {
1975 1 : return nil
1976 1 : }
1977 1 : return keyspan.NewIter(b.cmp, b.tombstones)
1978 : }
1979 :
1980 : // newRangeKeyIter is part of the flushable interface.
1981 1 : func (b *flushableBatch) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
1982 1 : if len(b.rangeKeys) == 0 {
1983 1 : return nil
1984 1 : }
1985 1 : return keyspan.NewIter(b.cmp, b.rangeKeys)
1986 : }
1987 :
1988 : // containsRangeKeys is part of the flushable interface.
1989 1 : func (b *flushableBatch) containsRangeKeys() bool { return len(b.rangeKeys) > 0 }
1990 :
1991 : // inuseBytes is part of the flushable interface.
1992 1 : func (b *flushableBatch) inuseBytes() uint64 {
1993 1 : return uint64(len(b.data) - batchHeaderLen)
1994 1 : }
1995 :
1996 : // totalBytes is part of the flushable interface.
1997 1 : func (b *flushableBatch) totalBytes() uint64 {
1998 1 : return uint64(cap(b.data))
1999 1 : }
2000 :
2001 : // readyForFlush is part of the flushable interface.
2002 1 : func (b *flushableBatch) readyForFlush() bool {
2003 1 : // A flushable batch is always ready for flush; it must be flushed together
2004 1 : // with the previous memtable.
2005 1 : return true
2006 1 : }
2007 :
2008 : // computePossibleOverlaps is part of the flushable interface.
2009 : func (b *flushableBatch) computePossibleOverlaps(
2010 : fn func(bounded) shouldContinue, bounded ...bounded,
2011 1 : ) {
2012 1 : computePossibleOverlapsGenericImpl[*flushableBatch](b, b.cmp, fn, bounded)
2013 1 : }
2014 :
2015 : // Note: flushableBatchIter mirrors the implementation of batchIter. Keep the
2016 : // two in sync.
2017 : type flushableBatchIter struct {
2018 : // Members to be initialized by creator.
2019 : batch *flushableBatch
2020 : // The bytes backing the batch. Always the same as batch.data?
2021 : data []byte
2022 : // The sorted entries. This is not always equal to batch.offsets.
2023 : offsets []flushableBatchEntry
2024 : cmp Compare
2025 : // Must be initialized to -1. It is the index into offsets that represents
2026 : // the current iterator position.
2027 : index int
2028 :
2029 : // For internal use by the implementation.
2030 : key InternalKey
2031 : err error
2032 :
2033 : // Optionally initialize to bounds of iteration, if any.
2034 : lower []byte
2035 : upper []byte
2036 : }
2037 :
2038 : // flushableBatchIter implements the base.InternalIterator interface.
2039 : var _ base.InternalIterator = (*flushableBatchIter)(nil)
2040 :
2041 1 : func (i *flushableBatchIter) String() string {
2042 1 : return "flushable-batch"
2043 1 : }
2044 :
2045 : // SeekGE implements internalIterator.SeekGE, as documented in the pebble
2046 : // package. Ignore flags.TrySeekUsingNext() since we don't expect this
2047 : // optimization to provide much benefit here at the moment.
2048 : func (i *flushableBatchIter) SeekGE(
2049 : key []byte, flags base.SeekGEFlags,
2050 1 : ) (*InternalKey, base.LazyValue) {
2051 1 : i.err = nil // clear cached iteration error
2052 1 : ikey := base.MakeSearchKey(key)
2053 1 : i.index = sort.Search(len(i.offsets), func(j int) bool {
2054 1 : return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
2055 1 : })
2056 1 : if i.index >= len(i.offsets) {
2057 1 : return nil, base.LazyValue{}
2058 1 : }
2059 1 : i.key = i.getKey(i.index)
2060 1 : if i.upper != nil && i.cmp(i.key.UserKey, i.upper) >= 0 {
2061 1 : i.index = len(i.offsets)
2062 1 : return nil, base.LazyValue{}
2063 1 : }
2064 1 : return &i.key, i.value()
2065 : }
2066 :
2067 : // SeekPrefixGE implements internalIterator.SeekPrefixGE, as documented in the
2068 : // pebble package.
2069 : func (i *flushableBatchIter) SeekPrefixGE(
2070 : prefix, key []byte, flags base.SeekGEFlags,
2071 1 : ) (*base.InternalKey, base.LazyValue) {
2072 1 : return i.SeekGE(key, flags)
2073 1 : }
2074 :
2075 : // SeekLT implements internalIterator.SeekLT, as documented in the pebble
2076 : // package.
2077 : func (i *flushableBatchIter) SeekLT(
2078 : key []byte, flags base.SeekLTFlags,
2079 1 : ) (*InternalKey, base.LazyValue) {
2080 1 : i.err = nil // clear cached iteration error
2081 1 : ikey := base.MakeSearchKey(key)
2082 1 : i.index = sort.Search(len(i.offsets), func(j int) bool {
2083 1 : return base.InternalCompare(i.cmp, ikey, i.getKey(j)) <= 0
2084 1 : })
2085 1 : i.index--
2086 1 : if i.index < 0 {
2087 1 : return nil, base.LazyValue{}
2088 1 : }
2089 1 : i.key = i.getKey(i.index)
2090 1 : if i.lower != nil && i.cmp(i.key.UserKey, i.lower) < 0 {
2091 1 : i.index = -1
2092 1 : return nil, base.LazyValue{}
2093 1 : }
2094 1 : return &i.key, i.value()
2095 : }
2096 :
2097 : // First implements internalIterator.First, as documented in the pebble
2098 : // package.
2099 1 : func (i *flushableBatchIter) First() (*InternalKey, base.LazyValue) {
2100 1 : i.err = nil // clear cached iteration error
2101 1 : if len(i.offsets) == 0 {
2102 1 : return nil, base.LazyValue{}
2103 1 : }
2104 1 : i.index = 0
2105 1 : i.key = i.getKey(i.index)
2106 1 : if i.upper != nil && i.cmp(i.key.UserKey, i.upper) >= 0 {
2107 1 : i.index = len(i.offsets)
2108 1 : return nil, base.LazyValue{}
2109 1 : }
2110 1 : return &i.key, i.value()
2111 : }
2112 :
2113 : // Last implements internalIterator.Last, as documented in the pebble
2114 : // package.
2115 1 : func (i *flushableBatchIter) Last() (*InternalKey, base.LazyValue) {
2116 1 : i.err = nil // clear cached iteration error
2117 1 : if len(i.offsets) == 0 {
2118 0 : return nil, base.LazyValue{}
2119 0 : }
2120 1 : i.index = len(i.offsets) - 1
2121 1 : i.key = i.getKey(i.index)
2122 1 : if i.lower != nil && i.cmp(i.key.UserKey, i.lower) < 0 {
2123 0 : i.index = -1
2124 0 : return nil, base.LazyValue{}
2125 0 : }
2126 1 : return &i.key, i.value()
2127 : }
2128 :
2129 : // Note: flushFlushableBatchIter.Next mirrors the implementation of
2130 : // flushableBatchIter.Next due to performance. Keep the two in sync.
2131 1 : func (i *flushableBatchIter) Next() (*InternalKey, base.LazyValue) {
2132 1 : if i.index == len(i.offsets) {
2133 0 : return nil, base.LazyValue{}
2134 0 : }
2135 1 : i.index++
2136 1 : if i.index == len(i.offsets) {
2137 1 : return nil, base.LazyValue{}
2138 1 : }
2139 1 : i.key = i.getKey(i.index)
2140 1 : if i.upper != nil && i.cmp(i.key.UserKey, i.upper) >= 0 {
2141 1 : i.index = len(i.offsets)
2142 1 : return nil, base.LazyValue{}
2143 1 : }
2144 1 : return &i.key, i.value()
2145 : }
2146 :
2147 1 : func (i *flushableBatchIter) Prev() (*InternalKey, base.LazyValue) {
2148 1 : if i.index < 0 {
2149 0 : return nil, base.LazyValue{}
2150 0 : }
2151 1 : i.index--
2152 1 : if i.index < 0 {
2153 1 : return nil, base.LazyValue{}
2154 1 : }
2155 1 : i.key = i.getKey(i.index)
2156 1 : if i.lower != nil && i.cmp(i.key.UserKey, i.lower) < 0 {
2157 1 : i.index = -1
2158 1 : return nil, base.LazyValue{}
2159 1 : }
2160 1 : return &i.key, i.value()
2161 : }
2162 :
2163 : // Note: flushFlushableBatchIter.NextPrefix mirrors the implementation of
2164 : // flushableBatchIter.NextPrefix due to performance. Keep the two in sync.
2165 0 : func (i *flushableBatchIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
2166 0 : return i.SeekGE(succKey, base.SeekGEFlagsNone.EnableTrySeekUsingNext())
2167 0 : }
2168 :
2169 1 : func (i *flushableBatchIter) getKey(index int) InternalKey {
2170 1 : e := &i.offsets[index]
2171 1 : kind := InternalKeyKind(i.data[e.offset])
2172 1 : key := i.data[e.keyStart:e.keyEnd]
2173 1 : return base.MakeInternalKey(key, i.batch.seqNum+uint64(e.index), kind)
2174 1 : }
2175 :
2176 1 : func (i *flushableBatchIter) value() base.LazyValue {
2177 1 : p := i.data[i.offsets[i.index].offset:]
2178 1 : if len(p) == 0 {
2179 0 : i.err = base.CorruptionErrorf("corrupted batch")
2180 0 : return base.LazyValue{}
2181 0 : }
2182 1 : kind := InternalKeyKind(p[0])
2183 1 : if kind > InternalKeyKindMax {
2184 0 : i.err = base.CorruptionErrorf("corrupted batch")
2185 0 : return base.LazyValue{}
2186 0 : }
2187 1 : var value []byte
2188 1 : var ok bool
2189 1 : switch kind {
2190 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
2191 : InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
2192 1 : InternalKeyKindDeleteSized:
2193 1 : keyEnd := i.offsets[i.index].keyEnd
2194 1 : _, value, ok = batchDecodeStr(i.data[keyEnd:])
2195 1 : if !ok {
2196 0 : i.err = base.CorruptionErrorf("corrupted batch")
2197 0 : return base.LazyValue{}
2198 0 : }
2199 : }
2200 1 : return base.MakeInPlaceValue(value)
2201 : }
2202 :
2203 0 : func (i *flushableBatchIter) Valid() bool {
2204 0 : return i.index >= 0 && i.index < len(i.offsets)
2205 0 : }
2206 :
2207 1 : func (i *flushableBatchIter) Error() error {
2208 1 : return i.err
2209 1 : }
2210 :
2211 1 : func (i *flushableBatchIter) Close() error {
2212 1 : return i.err
2213 1 : }
2214 :
2215 1 : func (i *flushableBatchIter) SetBounds(lower, upper []byte) {
2216 1 : i.lower = lower
2217 1 : i.upper = upper
2218 1 : }
2219 :
2220 0 : func (i *flushableBatchIter) SetContext(_ context.Context) {}
2221 :
2222 : // flushFlushableBatchIter is similar to flushableBatchIter but it keeps track
2223 : // of number of bytes iterated.
2224 : type flushFlushableBatchIter struct {
2225 : flushableBatchIter
2226 : bytesIterated *uint64
2227 : }
2228 :
2229 : // flushFlushableBatchIter implements the base.InternalIterator interface.
2230 : var _ base.InternalIterator = (*flushFlushableBatchIter)(nil)
2231 :
2232 0 : func (i *flushFlushableBatchIter) String() string {
2233 0 : return "flushable-batch"
2234 0 : }
2235 :
2236 : func (i *flushFlushableBatchIter) SeekGE(
2237 : key []byte, flags base.SeekGEFlags,
2238 0 : ) (*InternalKey, base.LazyValue) {
2239 0 : panic("pebble: SeekGE unimplemented")
2240 : }
2241 :
2242 : func (i *flushFlushableBatchIter) SeekPrefixGE(
2243 : prefix, key []byte, flags base.SeekGEFlags,
2244 0 : ) (*base.InternalKey, base.LazyValue) {
2245 0 : panic("pebble: SeekPrefixGE unimplemented")
2246 : }
2247 :
2248 : func (i *flushFlushableBatchIter) SeekLT(
2249 : key []byte, flags base.SeekLTFlags,
2250 0 : ) (*InternalKey, base.LazyValue) {
2251 0 : panic("pebble: SeekLT unimplemented")
2252 : }
2253 :
2254 1 : func (i *flushFlushableBatchIter) First() (*InternalKey, base.LazyValue) {
2255 1 : i.err = nil // clear cached iteration error
2256 1 : key, val := i.flushableBatchIter.First()
2257 1 : if key == nil {
2258 0 : return nil, base.LazyValue{}
2259 0 : }
2260 1 : entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset
2261 1 : *i.bytesIterated += uint64(entryBytes) + i.valueSize()
2262 1 : return key, val
2263 : }
2264 :
2265 0 : func (i *flushFlushableBatchIter) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
2266 0 : panic("pebble: Prev unimplemented")
2267 : }
2268 :
2269 : // Note: flushFlushableBatchIter.Next mirrors the implementation of
2270 : // flushableBatchIter.Next due to performance. Keep the two in sync.
2271 1 : func (i *flushFlushableBatchIter) Next() (*InternalKey, base.LazyValue) {
2272 1 : if i.index == len(i.offsets) {
2273 0 : return nil, base.LazyValue{}
2274 0 : }
2275 1 : i.index++
2276 1 : if i.index == len(i.offsets) {
2277 1 : return nil, base.LazyValue{}
2278 1 : }
2279 1 : i.key = i.getKey(i.index)
2280 1 : entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset
2281 1 : *i.bytesIterated += uint64(entryBytes) + i.valueSize()
2282 1 : return &i.key, i.value()
2283 : }
2284 :
2285 0 : func (i flushFlushableBatchIter) Prev() (*InternalKey, base.LazyValue) {
2286 0 : panic("pebble: Prev unimplemented")
2287 : }
2288 :
2289 1 : func (i flushFlushableBatchIter) valueSize() uint64 {
2290 1 : p := i.data[i.offsets[i.index].offset:]
2291 1 : if len(p) == 0 {
2292 0 : i.err = base.CorruptionErrorf("corrupted batch")
2293 0 : return 0
2294 0 : }
2295 1 : kind := InternalKeyKind(p[0])
2296 1 : if kind > InternalKeyKindMax {
2297 0 : i.err = base.CorruptionErrorf("corrupted batch")
2298 0 : return 0
2299 0 : }
2300 1 : var length uint64
2301 1 : switch kind {
2302 1 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete:
2303 1 : keyEnd := i.offsets[i.index].keyEnd
2304 1 : v, n := binary.Uvarint(i.data[keyEnd:])
2305 1 : if n <= 0 {
2306 0 : i.err = base.CorruptionErrorf("corrupted batch")
2307 0 : return 0
2308 0 : }
2309 1 : length = v + uint64(n)
2310 : }
2311 1 : return length
2312 : }
2313 :
2314 : // batchSort returns iterators for the sorted contents of the batch. It is
2315 : // intended for testing use only. The batch.Sort dance is done to prevent
2316 : // exposing this method in the public pebble interface.
2317 : func batchSort(
2318 : i interface{},
2319 : ) (
2320 : points internalIterator,
2321 : rangeDels keyspan.FragmentIterator,
2322 : rangeKeys keyspan.FragmentIterator,
2323 1 : ) {
2324 1 : b := i.(*Batch)
2325 1 : if b.Indexed() {
2326 1 : pointIter := b.newInternalIter(nil)
2327 1 : rangeDelIter := b.newRangeDelIter(nil, math.MaxUint64)
2328 1 : rangeKeyIter := b.newRangeKeyIter(nil, math.MaxUint64)
2329 1 : return pointIter, rangeDelIter, rangeKeyIter
2330 1 : }
2331 1 : f, err := newFlushableBatch(b, b.db.opts.Comparer)
2332 1 : if err != nil {
2333 0 : panic(err)
2334 : }
2335 1 : return f.newIter(nil), f.newRangeDelIter(nil), f.newRangeKeyIter(nil)
2336 : }
2337 :
2338 1 : func init() {
2339 1 : private.BatchSort = batchSort
2340 1 : }
|