Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package batchrepr provides interfaces for reading and writing the binary
6 : // batch representation. This batch representation is used in-memory while
7 : // constructing a batch and on-disk within the write-ahead log.
8 : package batchrepr
9 :
10 : import (
11 : "encoding/binary"
12 : "fmt"
13 : "unsafe"
14 :
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/pkg/errors"
17 : )
18 :
19 : // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
20 : var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch"))
21 :
22 : const (
23 : // HeaderLen is the length of the batch header in bytes.
24 : HeaderLen = 12
25 : // countOffset is the index into the batch representation where the
26 : // count is stored, encoded as a little-endian uint32.
27 : countOffset = 8
28 : )
29 :
30 : // IsEmpty returns true iff the batch contains zero keys.
31 1 : func IsEmpty(repr []byte) bool {
32 1 : return len(repr) <= HeaderLen
33 1 : }
34 :
35 : // ReadHeader reads the contents of the batch header. If the repr is too small
36 : // to contain a valid batch header, ReadHeader returns ok=false.
37 1 : func ReadHeader(repr []byte) (h Header, ok bool) {
38 1 : if len(repr) < HeaderLen {
39 1 : return h, false
40 1 : }
41 1 : return Header{
42 1 : SeqNum: ReadSeqNum(repr),
43 1 : Count: binary.LittleEndian.Uint32(repr[countOffset:HeaderLen]),
44 1 : }, true
45 : }
46 :
47 : // Header describes the contents of a batch header.
48 : type Header struct {
49 : // SeqNum is the sequence number at which the batch is committed. A batch
50 : // that has not yet committed will have a zero sequence number.
51 : SeqNum uint64
52 : // Count is the count of keys written to the batch.
53 : Count uint32
54 : }
55 :
56 : // String returns a string representation of the header's contents.
57 1 : func (h Header) String() string {
58 1 : return fmt.Sprintf("[seqNum=%d,count=%d]", h.SeqNum, h.Count)
59 1 : }
60 :
61 : // ReadSeqNum reads the sequence number encoded within the batch. ReadSeqNum
62 : // does not validate that the repr is valid. It's exported only for very
63 : // performance sensitive code paths that should not necessarily read the rest of
64 : // the header as well.
65 1 : func ReadSeqNum(repr []byte) uint64 {
66 1 : return binary.LittleEndian.Uint64(repr[:countOffset])
67 1 : }
68 :
69 : // Read constructs a Reader from an encoded batch representation, ignoring the
70 : // contents of the Header.
71 1 : func Read(repr []byte) (r Reader) {
72 1 : if len(repr) <= HeaderLen {
73 1 : return nil
74 1 : }
75 1 : return repr[HeaderLen:]
76 : }
77 :
78 : // Reader iterates over the entries contained in a batch.
79 : type Reader []byte
80 :
81 : // Next returns the next entry in this batch, if there is one. If the reader has
82 : // reached the end of the batch, Next returns ok=false and a nil error. If the
83 : // batch is corrupt and the next entry is illegible, Next returns ok=false and a
84 : // non-nil error.
85 1 : func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, ok bool, err error) {
86 1 : if len(*r) == 0 {
87 1 : return 0, nil, nil, false, nil
88 1 : }
89 1 : kind = base.InternalKeyKind((*r)[0])
90 1 : if kind > base.InternalKeyKindMax {
91 1 : return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0])
92 1 : }
93 1 : *r, ukey, ok = DecodeStr((*r)[1:])
94 1 : if !ok {
95 1 : return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key")
96 1 : }
97 1 : switch kind {
98 : case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete,
99 : base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete,
100 1 : base.InternalKeyKindDeleteSized:
101 1 : *r, value, ok = DecodeStr(*r)
102 1 : if !ok {
103 1 : return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind)
104 1 : }
105 : }
106 1 : return kind, ukey, value, true, nil
107 : }
108 :
109 : // DecodeStr decodes a varint encoded string from data, returning the remainder
110 : // of data and the decoded string. It returns ok=false if the varint is invalid.
111 : //
112 : // TODO(jackson): This should be unexported once pebble package callers have
113 : // been updated to use appropriate abstractions.
114 1 : func DecodeStr(data []byte) (odata []byte, s []byte, ok bool) {
115 1 : // TODO(jackson): This will index out of bounds if there's no varint or an
116 1 : // invalid varint (eg, a single 0xff byte). Correcting will add a bit of
117 1 : // overhead. We could avoid that overhead whenever len(data) >=
118 1 : // binary.MaxVarint32?
119 1 :
120 1 : var v uint32
121 1 : var n int
122 1 : ptr := unsafe.Pointer(&data[0])
123 1 : if a := *((*uint8)(ptr)); a < 128 {
124 1 : v = uint32(a)
125 1 : n = 1
126 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
127 1 : v = uint32(b)<<7 | uint32(a)
128 1 : n = 2
129 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
130 1 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
131 1 : n = 3
132 1 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
133 1 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
134 1 : n = 4
135 1 : } else {
136 0 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
137 0 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
138 0 : n = 5
139 0 : }
140 :
141 1 : data = data[n:]
142 1 : if v > uint32(len(data)) {
143 1 : return nil, nil, false
144 1 : }
145 1 : return data[v:], data[:v], true
146 : }
|