Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package batchrepr provides interfaces for reading and writing the binary
6 : // batch representation. This batch representation is used in-memory while
7 : // constructing a batch and on-disk within the write-ahead log.
8 : package batchrepr
9 :
10 : import (
11 : "encoding/binary"
12 : "fmt"
13 : "unsafe"
14 :
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/pkg/errors"
17 : )
18 :
19 : // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
20 : var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch"))
21 :
22 : const (
23 : // HeaderLen is the length of the batch header in bytes.
24 : HeaderLen = 12
25 : // countOffset is the index into the batch representation where the
26 : // count is stored, encoded as a little-endian uint32.
27 : countOffset = 8
28 : )
29 :
30 : // IsEmpty returns true iff the batch contains zero keys.
31 2 : func IsEmpty(repr []byte) bool {
32 2 : return len(repr) <= HeaderLen
33 2 : }
34 :
35 : // ReadHeader reads the contents of the batch header. If the repr is too small
36 : // to contain a valid batch header, ReadHeader returns ok=false.
37 2 : func ReadHeader(repr []byte) (h Header, ok bool) {
38 2 : if len(repr) < HeaderLen {
39 1 : return h, false
40 1 : }
41 2 : return Header{
42 2 : SeqNum: ReadSeqNum(repr),
43 2 : Count: binary.LittleEndian.Uint32(repr[countOffset:HeaderLen]),
44 2 : }, true
45 : }
46 :
47 : // Header describes the contents of a batch header.
48 : type Header struct {
49 : // SeqNum is the sequence number at which the batch is committed. A batch
50 : // that has not yet committed will have a zero sequence number.
51 : SeqNum uint64
52 : // Count is the count of keys written to the batch.
53 : Count uint32
54 : }
55 :
56 : // String returns a string representation of the header's contents.
57 1 : func (h Header) String() string {
58 1 : return fmt.Sprintf("[seqNum=%d,count=%d]", h.SeqNum, h.Count)
59 1 : }
60 :
61 : // ReadSeqNum reads the sequence number encoded within the batch. ReadSeqNum
62 : // does not validate that the repr is valid. It's exported only for very
63 : // performance sensitive code paths that should not necessarily read the rest of
64 : // the header as well.
65 2 : func ReadSeqNum(repr []byte) uint64 {
66 2 : return binary.LittleEndian.Uint64(repr[:countOffset])
67 2 : }
68 :
69 : // Read constructs a Reader from an encoded batch representation, ignoring the
70 : // contents of the Header.
71 2 : func Read(repr []byte) (r Reader) {
72 2 : if len(repr) <= HeaderLen {
73 2 : return nil
74 2 : }
75 2 : return repr[HeaderLen:]
76 : }
77 :
78 : // Reader iterates over the entries contained in a batch.
79 : type Reader []byte
80 :
81 : // Next returns the next entry in this batch, if there is one. If the reader has
82 : // reached the end of the batch, Next returns ok=false and a nil error. If the
83 : // batch is corrupt and the next entry is illegible, Next returns ok=false and a
84 : // non-nil error.
85 2 : func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, ok bool, err error) {
86 2 : if len(*r) == 0 {
87 2 : return 0, nil, nil, false, nil
88 2 : }
89 2 : kind = base.InternalKeyKind((*r)[0])
90 2 : if kind > base.InternalKeyKindMax {
91 1 : return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0])
92 1 : }
93 2 : *r, ukey, ok = DecodeStr((*r)[1:])
94 2 : if !ok {
95 1 : return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key")
96 1 : }
97 2 : switch kind {
98 : case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete,
99 : base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete,
100 2 : base.InternalKeyKindDeleteSized:
101 2 : *r, value, ok = DecodeStr(*r)
102 2 : if !ok {
103 1 : return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind)
104 1 : }
105 : }
106 2 : return kind, ukey, value, true, nil
107 : }
108 :
109 : // DecodeStr decodes a varint encoded string from data, returning the remainder
110 : // of data and the decoded string. It returns ok=false if the varint is invalid.
111 : //
112 : // TODO(jackson): This should be unexported once pebble package callers have
113 : // been updated to use appropriate abstractions.
114 2 : func DecodeStr(data []byte) (odata []byte, s []byte, ok bool) {
115 2 : // TODO(jackson): This will index out of bounds if there's no varint or an
116 2 : // invalid varint (eg, a single 0xff byte). Correcting will add a bit of
117 2 : // overhead. We could avoid that overhead whenever len(data) >=
118 2 : // binary.MaxVarint32?
119 2 :
120 2 : var v uint32
121 2 : var n int
122 2 : ptr := unsafe.Pointer(&data[0])
123 2 : if a := *((*uint8)(ptr)); a < 128 {
124 2 : v = uint32(a)
125 2 : n = 1
126 2 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
127 1 : v = uint32(b)<<7 | uint32(a)
128 1 : n = 2
129 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
130 1 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
131 1 : n = 3
132 1 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
133 1 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
134 1 : n = 4
135 1 : } else {
136 0 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
137 0 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
138 0 : n = 5
139 0 : }
140 :
141 2 : data = data[n:]
142 2 : if v > uint32(len(data)) {
143 1 : return nil, nil, false
144 1 : }
145 2 : return data[v:], data[:v], true
146 : }
|