Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "encoding/binary"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : )
13 :
14 : type blockWriter struct {
15 : restartInterval int
16 : nEntries int
17 : nextRestart int
18 : buf []byte
19 : // For datablocks in TableFormatPebblev3, we steal the most significant bit
20 : // in restarts for encoding setHasSameKeyPrefixSinceLastRestart. This leaves
21 : // us with 31 bits, which is more than enough (no one needs > 2GB blocks).
22 : // Typically, restarts occur every 16 keys, and by storing this bit with the
23 : // restart, we can optimize for the case where a user wants to skip to the
24 : // next prefix which happens to be in the same data block, but is > 16 keys
25 : // away. We have seen production situations with 100+ versions per MVCC key
26 : // (which share the same prefix). Additionally, for such writers, the prefix
27 : // compression of the key, that shares the key with the preceding key, is
28 : // limited to the prefix part of the preceding key -- this ensures that when
29 : // doing NPrefix (see blockIter) we don't need to assemble the full key
30 : // for each step since by limiting the length of the shared key we are
31 : // ensuring that any of the keys with the same prefix can be used to
32 : // assemble the full key when the prefix does change.
33 : restarts []uint32
34 : // Do not read curKey directly from outside blockWriter since it can have
35 : // the InternalKeyKindSSTableInternalObsoleteBit set. Use getCurKey() or
36 : // getCurUserKey() instead.
37 : curKey []byte
38 : // curValue excludes the optional prefix provided to
39 : // storeWithOptionalValuePrefix.
40 : curValue []byte
41 : prevKey []byte
42 : tmp [4]byte
43 : // We don't know the state of the sets that were at the end of the previous
44 : // block, so this is initially 0. It may be true for the second and later
45 : // restarts in a block. Not having inter-block information is fine since we
46 : // will optimize by stepping through restarts only within the same block.
47 : // Note that the first restart is the first key in the block.
48 : setHasSameKeyPrefixSinceLastRestart bool
49 : }
50 :
51 1 : func (w *blockWriter) clear() {
52 1 : *w = blockWriter{
53 1 : buf: w.buf[:0],
54 1 : restarts: w.restarts[:0],
55 1 : curKey: w.curKey[:0],
56 1 : curValue: w.curValue[:0],
57 1 : prevKey: w.prevKey[:0],
58 1 : }
59 1 : }
60 :
61 : // MaximumBlockSize is an extremely generous maximum block size of 256MiB. We
62 : // explicitly place this limit to reserve a few bits in the restart for
63 : // internal use.
64 : const MaximumBlockSize = 1 << 28
65 : const setHasSameKeyPrefixRestartMask uint32 = 1 << 31
66 : const restartMaskLittleEndianHighByteWithoutSetHasSamePrefix byte = 0b0111_1111
67 : const restartMaskLittleEndianHighByteOnlySetHasSamePrefix byte = 0b1000_0000
68 :
69 1 : func (w *blockWriter) getCurKey() InternalKey {
70 1 : k := base.DecodeInternalKey(w.curKey)
71 1 : k.Trailer = k.Trailer & trailerObsoleteMask
72 1 : return k
73 1 : }
74 :
75 1 : func (w *blockWriter) getCurUserKey() []byte {
76 1 : n := len(w.curKey) - base.InternalTrailerLen
77 1 : if n < 0 {
78 0 : panic(errors.AssertionFailedf("corrupt key in blockWriter buffer"))
79 : }
80 1 : return w.curKey[:n:n]
81 : }
82 :
83 : // If !addValuePrefix, the valuePrefix is ignored.
84 : func (w *blockWriter) storeWithOptionalValuePrefix(
85 : keySize int,
86 : value []byte,
87 : maxSharedKeyLen int,
88 : addValuePrefix bool,
89 : valuePrefix valuePrefix,
90 : setHasSameKeyPrefix bool,
91 1 : ) {
92 1 : shared := 0
93 1 : if !setHasSameKeyPrefix {
94 1 : w.setHasSameKeyPrefixSinceLastRestart = false
95 1 : }
96 1 : if w.nEntries == w.nextRestart {
97 1 : w.nextRestart = w.nEntries + w.restartInterval
98 1 : restart := uint32(len(w.buf))
99 1 : if w.setHasSameKeyPrefixSinceLastRestart {
100 1 : restart = restart | setHasSameKeyPrefixRestartMask
101 1 : }
102 1 : w.setHasSameKeyPrefixSinceLastRestart = true
103 1 : w.restarts = append(w.restarts, restart)
104 1 : } else {
105 1 : // TODO(peter): Manually inlined version of base.SharedPrefixLen(). This
106 1 : // is 3% faster on BenchmarkWriter on go1.16. Remove if future versions
107 1 : // show this to not be a performance win. For now, functions that use of
108 1 : // unsafe cannot be inlined.
109 1 : n := maxSharedKeyLen
110 1 : if n > len(w.prevKey) {
111 1 : n = len(w.prevKey)
112 1 : }
113 1 : asUint64 := func(b []byte, i int) uint64 {
114 1 : return binary.LittleEndian.Uint64(b[i:])
115 1 : }
116 1 : for shared < n-7 && asUint64(w.curKey, shared) == asUint64(w.prevKey, shared) {
117 1 : shared += 8
118 1 : }
119 1 : for shared < n && w.curKey[shared] == w.prevKey[shared] {
120 1 : shared++
121 1 : }
122 : }
123 :
124 1 : lenValuePlusOptionalPrefix := len(value)
125 1 : if addValuePrefix {
126 1 : lenValuePlusOptionalPrefix++
127 1 : }
128 1 : needed := 3*binary.MaxVarintLen32 + len(w.curKey[shared:]) + lenValuePlusOptionalPrefix
129 1 : n := len(w.buf)
130 1 : if cap(w.buf) < n+needed {
131 1 : newCap := 2 * cap(w.buf)
132 1 : if newCap == 0 {
133 1 : newCap = 1024
134 1 : }
135 1 : for newCap < n+needed {
136 1 : newCap *= 2
137 1 : }
138 1 : newBuf := make([]byte, n, newCap)
139 1 : copy(newBuf, w.buf)
140 1 : w.buf = newBuf
141 : }
142 1 : w.buf = w.buf[:n+needed]
143 1 :
144 1 : // TODO(peter): Manually inlined versions of binary.PutUvarint(). This is 15%
145 1 : // faster on BenchmarkWriter on go1.13. Remove if go1.14 or future versions
146 1 : // show this to not be a performance win.
147 1 : {
148 1 : x := uint32(shared)
149 1 : for x >= 0x80 {
150 0 : w.buf[n] = byte(x) | 0x80
151 0 : x >>= 7
152 0 : n++
153 0 : }
154 1 : w.buf[n] = byte(x)
155 1 : n++
156 : }
157 :
158 1 : {
159 1 : x := uint32(keySize - shared)
160 1 : for x >= 0x80 {
161 0 : w.buf[n] = byte(x) | 0x80
162 0 : x >>= 7
163 0 : n++
164 0 : }
165 1 : w.buf[n] = byte(x)
166 1 : n++
167 : }
168 :
169 1 : {
170 1 : x := uint32(lenValuePlusOptionalPrefix)
171 1 : for x >= 0x80 {
172 1 : w.buf[n] = byte(x) | 0x80
173 1 : x >>= 7
174 1 : n++
175 1 : }
176 1 : w.buf[n] = byte(x)
177 1 : n++
178 : }
179 :
180 1 : n += copy(w.buf[n:], w.curKey[shared:])
181 1 : if addValuePrefix {
182 1 : w.buf[n : n+1][0] = byte(valuePrefix)
183 1 : n++
184 1 : }
185 1 : n += copy(w.buf[n:], value)
186 1 : w.buf = w.buf[:n]
187 1 :
188 1 : w.curValue = w.buf[n-len(value):]
189 1 :
190 1 : w.nEntries++
191 : }
192 :
193 1 : func (w *blockWriter) add(key InternalKey, value []byte) {
194 1 : w.addWithOptionalValuePrefix(
195 1 : key, false, value, len(key.UserKey), false, 0, false)
196 1 : }
197 :
198 : // Callers that always set addValuePrefix to false should use add() instead.
199 : //
200 : // isObsolete indicates whether this key-value pair is obsolete in this
201 : // sstable (only applicable when writing data blocks) -- see the comment in
202 : // table.go and the longer one in format.go. addValuePrefix adds a 1 byte
203 : // prefix to the value, specified in valuePrefix -- this is used for data
204 : // blocks in TableFormatPebblev3 onwards for SETs (see the comment in
205 : // format.go, with more details in value_block.go). setHasSameKeyPrefix is
206 : // also used in TableFormatPebblev3 onwards for SETs.
207 : func (w *blockWriter) addWithOptionalValuePrefix(
208 : key InternalKey,
209 : isObsolete bool,
210 : value []byte,
211 : maxSharedKeyLen int,
212 : addValuePrefix bool,
213 : valuePrefix valuePrefix,
214 : setHasSameKeyPrefix bool,
215 1 : ) {
216 1 : w.curKey, w.prevKey = w.prevKey, w.curKey
217 1 :
218 1 : size := key.Size()
219 1 : if cap(w.curKey) < size {
220 1 : w.curKey = make([]byte, 0, size*2)
221 1 : }
222 1 : w.curKey = w.curKey[:size]
223 1 : if isObsolete {
224 1 : key.Trailer = key.Trailer | trailerObsoleteBit
225 1 : }
226 1 : key.Encode(w.curKey)
227 1 :
228 1 : w.storeWithOptionalValuePrefix(
229 1 : size, value, maxSharedKeyLen, addValuePrefix, valuePrefix, setHasSameKeyPrefix)
230 : }
231 :
232 1 : func (w *blockWriter) finish() []byte {
233 1 : // Write the restart points to the buffer.
234 1 : if w.nEntries == 0 {
235 1 : // Every block must have at least one restart point.
236 1 : if cap(w.restarts) > 0 {
237 1 : w.restarts = w.restarts[:1]
238 1 : w.restarts[0] = 0
239 1 : } else {
240 1 : w.restarts = append(w.restarts, 0)
241 1 : }
242 : }
243 1 : tmp4 := w.tmp[:4]
244 1 : for _, x := range w.restarts {
245 1 : binary.LittleEndian.PutUint32(tmp4, x)
246 1 : w.buf = append(w.buf, tmp4...)
247 1 : }
248 1 : binary.LittleEndian.PutUint32(tmp4, uint32(len(w.restarts)))
249 1 : w.buf = append(w.buf, tmp4...)
250 1 : result := w.buf
251 1 :
252 1 : // Reset the block state.
253 1 : w.nEntries = 0
254 1 : w.nextRestart = 0
255 1 : w.buf = w.buf[:0]
256 1 : w.restarts = w.restarts[:0]
257 1 : return result
258 : }
259 :
260 : // emptyBlockSize holds the size of an empty block. Every block ends
261 : // in a uint32 trailer encoding the number of restart points within the
262 : // block.
263 : const emptyBlockSize = 4
264 :
265 1 : func (w *blockWriter) estimatedSize() int {
266 1 : return len(w.buf) + 4*len(w.restarts) + emptyBlockSize
267 1 : }
|