Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package rowblk defines facilities for row-oriented sstable blocks.
6 : package rowblk
7 :
8 : import (
9 : "encoding/binary"
10 : "unsafe"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/sstable/block"
15 : )
16 :
17 : const (
18 : // MaximumSize is an extremely generous maximum block size of 256MiB. We
19 : // explicitly place this limit to reserve a few bits in the restart for internal
20 : // use.
21 : MaximumSize = 1 << 28
22 : // EmptySize holds the size of an empty block. Every block ends in a uint32
23 : // trailer encoding the number of restart points within the block.
24 : EmptySize = 4
25 : )
26 :
27 : const (
28 : // TrailerObsoleteBit is a bit within the internal key trailer that's used
29 : // by the row-oriented block format to signify when a key is obsolete. It's
30 : // internal to the row-oriented block format, set when writing a block and
31 : // unset by blockIter, so no code outside block writing/reading code ever
32 : // sees it.
33 : //
34 : // TODO(jackson): Unexport once the blockIter is also in this package.
35 : TrailerObsoleteBit = base.InternalKeyTrailer(base.InternalKeyKindSSTableInternalObsoleteBit)
36 : // TrailerObsoleteMask defines a mask for the obsolete bit in the internal
37 : // key trailer.
38 : TrailerObsoleteMask = (base.InternalKeyTrailer(base.SeqNumMax) << 8) | base.InternalKeyTrailer(base.InternalKeyKindSSTableInternalObsoleteMask)
39 : )
40 :
41 : // Writer buffers and serializes key/value pairs into a row-oriented block.
42 : type Writer struct {
43 : // RestartInterval configures the interval at which the writer will write a
44 : // full key without prefix compression, and encode a corresponding restart
45 : // point.
46 : RestartInterval int
47 : nEntries int
48 : nextRestart int
49 : buf []byte
50 : // For datablocks in TableFormatPebblev3, we steal the most significant bit
51 : // in restarts for encoding setHasSameKeyPrefixSinceLastRestart. This leaves
52 : // us with 31 bits, which is more than enough (no one needs > 2GB blocks).
53 : // Typically, restarts occur every 16 keys, and by storing this bit with the
54 : // restart, we can optimize for the case where a user wants to skip to the
55 : // next prefix which happens to be in the same data block, but is > 16 keys
56 : // away. We have seen production situations with 100+ versions per MVCC key
57 : // (which share the same prefix). Additionally, for such writers, the prefix
58 : // compression of the key, that shares the key with the preceding key, is
59 : // limited to the prefix part of the preceding key -- this ensures that when
60 : // doing NPrefix (see blockIter) we don't need to assemble the full key
61 : // for each step since by limiting the length of the shared key we are
62 : // ensuring that any of the keys with the same prefix can be used to
63 : // assemble the full key when the prefix does change.
64 : restarts []uint32
65 : // Do not read curKey directly from outside blockWriter since it can have
66 : // the InternalKeyKindSSTableInternalObsoleteBit set. Use getCurKey() or
67 : // getCurUserKey() instead.
68 : curKey []byte
69 : // curValue excludes the optional prefix provided to
70 : // storeWithOptionalValuePrefix.
71 : curValue []byte
72 : prevKey []byte
73 : tmp [4]byte
74 : // We don't know the state of the sets that were at the end of the previous
75 : // block, so this is initially 0. It may be true for the second and later
76 : // restarts in a block. Not having inter-block information is fine since we
77 : // will optimize by stepping through restarts only within the same block.
78 : // Note that the first restart is the first key in the block.
79 : setHasSameKeyPrefixSinceLastRestart bool
80 : }
81 :
82 : // Reset resets the block writer to empty, preserving buffers for reuse.
83 2 : func (w *Writer) Reset() {
84 2 : *w = Writer{
85 2 : buf: w.buf[:0],
86 2 : restarts: w.restarts[:0],
87 2 : curKey: w.curKey[:0],
88 2 : curValue: w.curValue[:0],
89 2 : prevKey: w.prevKey[:0],
90 2 : }
91 2 : }
92 :
93 : const setHasSameKeyPrefixRestartMask uint32 = 1 << 31
94 :
95 : // EntryCount returns the count of entries written to the writer.
96 2 : func (w *Writer) EntryCount() int {
97 2 : return w.nEntries
98 2 : }
99 :
100 : // CurKey returns the most recently written key.
101 2 : func (w *Writer) CurKey() base.InternalKey {
102 2 : k := base.DecodeInternalKey(w.curKey)
103 2 : k.Trailer = k.Trailer & TrailerObsoleteMask
104 2 : return k
105 2 : }
106 :
107 : // CurValue returns the most recently written value.
108 2 : func (w *Writer) CurValue() []byte {
109 2 : return w.curValue
110 2 : }
111 :
112 : // CurUserKey returns the most recently written user key.
113 2 : func (w *Writer) CurUserKey() []byte {
114 2 : n := len(w.curKey) - base.InternalTrailerLen
115 2 : if n < 0 {
116 0 : panic(errors.AssertionFailedf("corrupt key in blockWriter buffer"))
117 : }
118 2 : return w.curKey[:n:n]
119 : }
120 :
121 : // If !addValuePrefix, the valuePrefix is ignored.
122 : func (w *Writer) storeWithOptionalValuePrefix(
123 : keySize int,
124 : value []byte,
125 : maxSharedKeyLen int,
126 : addValuePrefix bool,
127 : valuePrefix block.ValuePrefix,
128 : setHasSameKeyPrefix bool,
129 2 : ) {
130 2 : shared := 0
131 2 : if !setHasSameKeyPrefix {
132 2 : w.setHasSameKeyPrefixSinceLastRestart = false
133 2 : }
134 2 : if w.nEntries == w.nextRestart {
135 2 : w.nextRestart = w.nEntries + w.RestartInterval
136 2 : restart := uint32(len(w.buf))
137 2 : if w.setHasSameKeyPrefixSinceLastRestart {
138 2 : restart = restart | setHasSameKeyPrefixRestartMask
139 2 : }
140 2 : w.setHasSameKeyPrefixSinceLastRestart = true
141 2 : w.restarts = append(w.restarts, restart)
142 2 : } else {
143 2 : // TODO(peter): Manually inlined version of base.SharedPrefixLen(). This
144 2 : // is 3% faster on BenchmarkWriter on go1.16. Remove if future versions
145 2 : // show this to not be a performance win. For now, functions that use of
146 2 : // unsafe cannot be inlined.
147 2 : n := maxSharedKeyLen
148 2 : if n > len(w.prevKey) {
149 2 : n = len(w.prevKey)
150 2 : }
151 2 : asUint64 := func(b []byte, i int) uint64 {
152 2 : return binary.LittleEndian.Uint64(b[i:])
153 2 : }
154 2 : for shared < n-7 && asUint64(w.curKey, shared) == asUint64(w.prevKey, shared) {
155 2 : shared += 8
156 2 : }
157 2 : for shared < n && w.curKey[shared] == w.prevKey[shared] {
158 2 : shared++
159 2 : }
160 : }
161 :
162 2 : lenValuePlusOptionalPrefix := len(value)
163 2 : if addValuePrefix {
164 2 : lenValuePlusOptionalPrefix++
165 2 : }
166 2 : needed := 3*binary.MaxVarintLen32 + len(w.curKey[shared:]) + lenValuePlusOptionalPrefix
167 2 : n := len(w.buf)
168 2 : if cap(w.buf) < n+needed {
169 2 : newCap := 2 * cap(w.buf)
170 2 : if newCap == 0 {
171 2 : newCap = 1024
172 2 : }
173 2 : for newCap < n+needed {
174 2 : newCap *= 2
175 2 : }
176 2 : newBuf := make([]byte, n, newCap)
177 2 : copy(newBuf, w.buf)
178 2 : w.buf = newBuf
179 : }
180 2 : w.buf = w.buf[:n+needed]
181 2 :
182 2 : // TODO(peter): Manually inlined versions of binary.PutUvarint(). This is 15%
183 2 : // faster on BenchmarkWriter on go1.13. Remove if go1.14 or future versions
184 2 : // show this to not be a performance win.
185 2 : {
186 2 : x := uint32(shared)
187 2 : for x >= 0x80 {
188 0 : w.buf[n] = byte(x) | 0x80
189 0 : x >>= 7
190 0 : n++
191 0 : }
192 2 : w.buf[n] = byte(x)
193 2 : n++
194 : }
195 :
196 2 : {
197 2 : x := uint32(keySize - shared)
198 2 : for x >= 0x80 {
199 1 : w.buf[n] = byte(x) | 0x80
200 1 : x >>= 7
201 1 : n++
202 1 : }
203 2 : w.buf[n] = byte(x)
204 2 : n++
205 : }
206 :
207 2 : {
208 2 : x := uint32(lenValuePlusOptionalPrefix)
209 2 : for x >= 0x80 {
210 2 : w.buf[n] = byte(x) | 0x80
211 2 : x >>= 7
212 2 : n++
213 2 : }
214 2 : w.buf[n] = byte(x)
215 2 : n++
216 : }
217 :
218 2 : n += copy(w.buf[n:], w.curKey[shared:])
219 2 : if addValuePrefix {
220 2 : w.buf[n : n+1][0] = byte(valuePrefix)
221 2 : n++
222 2 : }
223 2 : n += copy(w.buf[n:], value)
224 2 : w.buf = w.buf[:n]
225 2 :
226 2 : w.curValue = w.buf[n-len(value):]
227 2 :
228 2 : w.nEntries++
229 : }
230 :
231 : // Add adds a key value pair to the block without a value prefix.
232 2 : func (w *Writer) Add(key base.InternalKey, value []byte) {
233 2 : w.AddWithOptionalValuePrefix(
234 2 : key, false, value, len(key.UserKey), false, 0, false)
235 2 : }
236 :
237 : // AddWithOptionalValuePrefix adds a key value pair to the block, optionally
238 : // including a value prefix.
239 : //
240 : // Callers that always set addValuePrefix to false should use add() instead.
241 : //
242 : // isObsolete indicates whether this key-value pair is obsolete in this
243 : // sstable (only applicable when writing data blocks) -- see the comment in
244 : // table.go and the longer one in format.go. addValuePrefix adds a 1 byte
245 : // prefix to the value, specified in valuePrefix -- this is used for data
246 : // blocks in TableFormatPebblev3 onwards for SETs (see the comment in
247 : // format.go, with more details in value_block.go). setHasSameKeyPrefix is
248 : // also used in TableFormatPebblev3 onwards for SETs.
249 : func (w *Writer) AddWithOptionalValuePrefix(
250 : key base.InternalKey,
251 : isObsolete bool,
252 : value []byte,
253 : maxSharedKeyLen int,
254 : addValuePrefix bool,
255 : valuePrefix block.ValuePrefix,
256 : setHasSameKeyPrefix bool,
257 2 : ) {
258 2 : w.curKey, w.prevKey = w.prevKey, w.curKey
259 2 :
260 2 : size := key.Size()
261 2 : if cap(w.curKey) < size {
262 2 : w.curKey = make([]byte, 0, size*2)
263 2 : }
264 2 : w.curKey = w.curKey[:size]
265 2 : if isObsolete {
266 2 : key.Trailer = key.Trailer | TrailerObsoleteBit
267 2 : }
268 2 : key.Encode(w.curKey)
269 2 :
270 2 : w.storeWithOptionalValuePrefix(
271 2 : size, value, maxSharedKeyLen, addValuePrefix, valuePrefix, setHasSameKeyPrefix)
272 : }
273 :
274 : // Finish finalizes the block, serializes it and returns the serialized data.
275 2 : func (w *Writer) Finish() []byte {
276 2 : // Write the restart points to the buffer.
277 2 : if w.nEntries == 0 {
278 2 : // Every block must have at least one restart point.
279 2 : if cap(w.restarts) > 0 {
280 2 : w.restarts = w.restarts[:1]
281 2 : w.restarts[0] = 0
282 2 : } else {
283 2 : w.restarts = append(w.restarts, 0)
284 2 : }
285 : }
286 2 : tmp4 := w.tmp[:4]
287 2 : for _, x := range w.restarts {
288 2 : binary.LittleEndian.PutUint32(tmp4, x)
289 2 : w.buf = append(w.buf, tmp4...)
290 2 : }
291 2 : binary.LittleEndian.PutUint32(tmp4, uint32(len(w.restarts)))
292 2 : w.buf = append(w.buf, tmp4...)
293 2 : result := w.buf
294 2 :
295 2 : // Reset the block state.
296 2 : w.nEntries = 0
297 2 : w.nextRestart = 0
298 2 : w.buf = w.buf[:0]
299 2 : w.restarts = w.restarts[:0]
300 2 : return result
301 : }
302 :
303 : // EstimatedSize returns the estimated size of the block in bytes.
304 2 : func (w *Writer) EstimatedSize() int {
305 2 : return len(w.buf) + 4*len(w.restarts) + EmptySize
306 2 : }
307 :
308 : // AddRaw adds a key value pair to the block.
309 2 : func (w *Writer) AddRaw(key, value []byte) {
310 2 : w.curKey, w.prevKey = w.prevKey, w.curKey
311 2 :
312 2 : size := len(key)
313 2 : if cap(w.curKey) < size {
314 2 : w.curKey = make([]byte, 0, size*2)
315 2 : }
316 2 : w.curKey = w.curKey[:size]
317 2 : copy(w.curKey, key)
318 2 : w.storeWithOptionalValuePrefix(
319 2 : size, value, len(key), false, 0, false)
320 : }
321 :
322 : // AddRawString is AddRaw but with a string key.
323 2 : func (w *Writer) AddRawString(key string, value []byte) {
324 2 : w.AddRaw(unsafe.Slice(unsafe.StringData(key), len(key)), value)
325 2 : }
|